[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2021-03-23 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=570723&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-570723
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 23/Mar/21 19:17
Start Date: 23/Mar/21 19:17
Worklog Time Spent: 10m 
  Work Description: chamikaramj commented on pull request #11295:
URL: https://github.com/apache/beam/pull/11295#issuecomment-805165726


   IIRC we could not get BT integration tests working with this PR. Also I 
think some folks from Google side was going to improve this or look into this 
but not sure if what work happened. I would like to get this PR to a good 
working state (including working ITs) before submitting. I think a getting a 
connector that is not fully fleshed out submitted will do more harm than good.
   
   Another option will be to add a multi-language wrapper for already 
established Java BT connector which should work for portable runners and 
Dataflow Runner v2.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 570723)
Time Spent: 50h 40m  (was: 50.5h)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Priority: P3
>  Time Spent: 50h 40m
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2021-03-23 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=570619&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-570619
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 23/Mar/21 16:27
Start Date: 23/Mar/21 16:27
Worklog Time Spent: 10m 
  Work Description: param17 commented on pull request #11295:
URL: https://github.com/apache/beam/pull/11295#issuecomment-805044580


   Can we re-open this? What's the next step?
   cc: @mf2199 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 570619)
Time Spent: 50.5h  (was: 50h 20m)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Priority: P3
>  Time Spent: 50.5h
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2020-12-25 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=528402&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-528402
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 25/Dec/20 11:30
Start Date: 25/Dec/20 11:30
Worklog Time Spent: 10m 
  Work Description: stale[bot] commented on pull request #11295:
URL: https://github.com/apache/beam/pull/11295#issuecomment-751236035


   This pull request has been closed due to lack of activity. If you think that 
is incorrect, or the pull request requires review, you can revive the PR at any 
time.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 528402)
Time Spent: 50h 20m  (was: 50h 10m)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Priority: P3
>  Time Spent: 50h 20m
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2020-12-25 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=528401&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-528401
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 25/Dec/20 11:29
Start Date: 25/Dec/20 11:29
Worklog Time Spent: 10m 
  Work Description: stale[bot] closed pull request #11295:
URL: https://github.com/apache/beam/pull/11295


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 528401)
Time Spent: 50h 10m  (was: 50h)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Priority: P3
>  Time Spent: 50h 10m
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2020-12-12 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=523503&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-523503
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 13/Dec/20 01:58
Start Date: 13/Dec/20 01:58
Worklog Time Spent: 10m 
  Work Description: stale[bot] commented on pull request #11295:
URL: https://github.com/apache/beam/pull/11295#issuecomment-743934679


   This pull request has been marked as stale due to 60 days of inactivity. It 
will be closed in 1 week if no further activity occurs. If you think that’s 
incorrect or this pull request requires a review, please simply write any 
comment. If closed, you can revive the PR at any time and @mention a reviewer 
or discuss it on the d...@beam.apache.org list. Thank you for your 
contributions.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 523503)
Time Spent: 50h  (was: 49h 50m)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Priority: P3
>  Time Spent: 50h
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2020-08-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=470392&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-470392
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 13/Aug/20 19:58
Start Date: 13/Aug/20 19:58
Worklog Time Spent: 10m 
  Work Description: aaltay commented on pull request #11295:
URL: https://github.com/apache/beam/pull/11295#issuecomment-673680680


   @mf2199 - What is the next step on this PR?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 470392)
Time Spent: 49h 50m  (was: 49h 40m)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Priority: P2
>  Labels: stale-P2
>  Time Spent: 49h 50m
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2020-07-30 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=464754&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-464754
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 30/Jul/20 21:57
Start Date: 30/Jul/20 21:57
Worklog Time Spent: 10m 
  Work Description: mf2199 commented on a change in pull request #11295:
URL: https://github.com/apache/beam/pull/11295#discussion_r463294587



##
File path: sdks/python/apache_beam/io/gcp/bigtableio.py
##
@@ -1,151 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""BigTable connector
-
-This module implements writing to BigTable tables.

Review comment:
   If I understand correctly, we are to restore this file in the original 
version while keeping the altered one in the `experimental` folder. If so, then 
it's done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 464754)
Time Spent: 49h 40m  (was: 49.5h)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Priority: P2
>  Time Spent: 49h 40m
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2020-07-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=462149&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-462149
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 22/Jul/20 17:02
Start Date: 22/Jul/20 17:02
Worklog Time Spent: 10m 
  Work Description: chamikaramj commented on a change in pull request 
#11295:
URL: https://github.com/apache/beam/pull/11295#discussion_r458937989



##
File path: sdks/python/apache_beam/io/gcp/bigtableio.py
##
@@ -1,151 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""BigTable connector
-
-This module implements writing to BigTable tables.

Review comment:
   Unfortunately we forgot to mark this as experimental. So we'll have to 
leave the sink here and add the new source to "experimental/bigtableio.py" for 
backwards compatibility. When we are confident about the performance of the 
source we can move it her. For example, I think we need to add support for 
dynamic work rebalancing similar to Java BigTable source (can be in a future 
PR).

##
File path: 
sdks/python/apache_beam/io/gcp/experimental/bigtableio_read_it_test.py
##
@@ -0,0 +1,169 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+""" Integration test for GC Bigtable connector [read]."""
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+
+import argparse
+import logging
+import unittest
+
+from nose.plugins.attrib import attr
+
+import apache_beam as beam
+from apache_beam.io.gcp.experimental.bigtableio import ReadFromBigtable
+from apache_beam.metrics.metric import MetricsFilter
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.runners.runner import PipelineState
+
+try:
+  from google.cloud.bigtable import Client
+except ImportError:
+  Client = None
+
+
+@unittest.skipIf(Client is None, 'GC Bigtable dependencies are not installed')
+class BigtableReadTest(unittest.TestCase):
+  """ Bigtable Read Connector Test
+
+  This tests the ReadFromBigtable connector class via reading rows from
+  a Bigtable table and comparing the `Rows Read` metrics with the row
+  count known a priori.
+  """
+  def setUp(self):
+self.options = parse_commane_line_arguments()
+
+logging.info('\nProject ID:  %s', self.options['project'])
+logging.info('\nInstance ID: %s', self.options['instance'])
+logging.info('\nTable ID:%s', self.options['table'])
+
+self._p_options = PipelineOptions(**self.options)
+self._p_options.view_as(SetupOptions).save_main_session = True
+
+logging.getLogger().setLevel(self.options['log_level'])
+
+# [OPTIONAL] Uncomment to save logs into a file
+# self._setup_log_file()
+
+# [OPTIONAL] Uncomment this to allow logging the pipeline options
+# for key, value in self.p_options.get_all_options().items():
+#   logging.info('Pipeline option {:32s} : {}'.format(key, value))
+
+  def _setup_log_file(self):
+if self.options['log_directory']:
+  # logging.basicConfig(
+  #   filename='{}{}.log'.format(
+  #   options['log_directory'], options['table']
+  #   ),
+  #   filemode='w',
+  #   level=logging.DEBUG
+  # )
+
+  # Forward all the logs to a file
+  fh = logging.File

[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2020-07-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=462143&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-462143
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 22/Jul/20 16:44
Start Date: 22/Jul/20 16:44
Worklog Time Spent: 10m 
  Work Description: chamikaramj edited a comment on pull request #11295:
URL: https://github.com/apache/beam/pull/11295#issuecomment-662555479


   Logs were not available anymore. Re-triggerred.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 462143)
Time Spent: 49h 20m  (was: 49h 10m)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Priority: P2
>  Time Spent: 49h 20m
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2020-07-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=462138&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-462138
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 22/Jul/20 16:31
Start Date: 22/Jul/20 16:31
Worklog Time Spent: 10m 
  Work Description: chamikaramj commented on pull request #11295:
URL: https://github.com/apache/beam/pull/11295#issuecomment-662555479


   Lots were not available anymore. Re-triggerred.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 462138)
Time Spent: 49h 10m  (was: 49h)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Priority: P2
>  Time Spent: 49h 10m
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2020-07-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=462136&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-462136
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 22/Jul/20 16:31
Start Date: 22/Jul/20 16:31
Worklog Time Spent: 10m 
  Work Description: chamikaramj commented on pull request #11295:
URL: https://github.com/apache/beam/pull/11295#issuecomment-662555196


   Retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 462136)
Time Spent: 49h  (was: 48h 50m)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Priority: P2
>  Time Spent: 49h
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2020-06-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=449279&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-449279
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 22/Jun/20 14:15
Start Date: 22/Jun/20 14:15
Worklog Time Spent: 10m 
  Work Description: mf2199 edited a comment on pull request #11295:
URL: https://github.com/apache/beam/pull/11295#issuecomment-647196061


   @chamikaramj @aaltay The build errors point to 'missing' arguments that were 
made so by design, e.g.:
   ```
   15:41:25 E   ValueError: Pipeline has validations errors: 
   15:41:25 E   Missing required option: project.
   15:41:25 E   Missing required option: region.
   15:41:25 E   Missing GCS path option: temp_location.
   15:41:25 E   Missing GCS path option: staging_location.
   ```
   [from "Run Python PreCommit" log]
   
   These are meant to be supplied via command line, as they may contain 
non-public information and are subject to change. 
   
   Any ideas?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 449279)
Time Spent: 48h 50m  (was: 48h 40m)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Priority: P2
>  Time Spent: 48h 50m
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2020-06-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=448950&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-448950
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 21/Jun/20 23:33
Start Date: 21/Jun/20 23:33
Worklog Time Spent: 10m 
  Work Description: mf2199 commented on pull request #11295:
URL: https://github.com/apache/beam/pull/11295#issuecomment-647196061


   @chamikaramj @aaltay At this point, the errors point to missing arguments 
that were made so by design, e.g.:
   ```
   15:41:25 E   ValueError: Pipeline has validations errors: 
   15:41:25 E   Missing required option: project.
   15:41:25 E   Missing required option: region.
   15:41:25 E   Missing GCS path option: temp_location.
   15:41:25 E   Missing GCS path option: staging_location.
   ```
   [from "Run Python PreCommit" log]
   
   These were left out purposely, as they may contain non-public information 
and subject to change. 
   
   Any ideas?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 448950)
Time Spent: 48h 40m  (was: 48.5h)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Priority: P2
>  Time Spent: 48h 40m
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2020-06-18 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=448151&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-448151
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 19/Jun/20 01:00
Start Date: 19/Jun/20 01:00
Worklog Time Spent: 10m 
  Work Description: aaltay commented on pull request #11295:
URL: https://github.com/apache/beam/pull/11295#issuecomment-646378304


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 448151)
Time Spent: 48.5h  (was: 48h 20m)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Priority: P2
>  Time Spent: 48.5h
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2020-06-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=442376&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-442376
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 07/Jun/20 00:35
Start Date: 07/Jun/20 00:35
Worklog Time Spent: 10m 
  Work Description: chamikaramj commented on pull request #8457:
URL: https://github.com/apache/beam/pull/8457#issuecomment-640137292


   Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 442376)
Time Spent: 48h 20m  (was: 48h 10m)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Assignee: Solomon Duskis
>Priority: P2
>  Labels: stale-assigned
>  Time Spent: 48h 20m
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2020-06-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=442375&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-442375
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 07/Jun/20 00:27
Start Date: 07/Jun/20 00:27
Worklog Time Spent: 10m 
  Work Description: mf2199 commented on pull request #8457:
URL: https://github.com/apache/beam/pull/8457#issuecomment-640136676


   @chamikaramj The other PR uses a different branch. I'm gonna update it then.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 442375)
Time Spent: 48h 10m  (was: 48h)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Assignee: Solomon Duskis
>Priority: P2
>  Labels: stale-assigned
>  Time Spent: 48h 10m
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2020-06-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=442340&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-442340
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 07/Jun/20 00:25
Start Date: 07/Jun/20 00:25
Worklog Time Spent: 10m 
  Work Description: chamikaramj closed pull request #8457:
URL: https://github.com/apache/beam/pull/8457


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 442340)
Time Spent: 47h 50m  (was: 47h 40m)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Assignee: Solomon Duskis
>Priority: P2
>  Labels: stale-assigned
>  Time Spent: 47h 50m
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2020-06-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=442343&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-442343
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 07/Jun/20 00:25
Start Date: 07/Jun/20 00:25
Worklog Time Spent: 10m 
  Work Description: chamikaramj commented on pull request #8457:
URL: https://github.com/apache/beam/pull/8457#issuecomment-640136370


   I reopened that PR and triggered tests. Please address any failures. Let's 
continue the review there.
   
   Closing this PR.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 442343)
Time Spent: 48h  (was: 47h 50m)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Assignee: Solomon Duskis
>Priority: P2
>  Labels: stale-assigned
>  Time Spent: 48h
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2020-06-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=442088&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-442088
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 06/Jun/20 00:20
Start Date: 06/Jun/20 00:20
Worklog Time Spent: 10m 
  Work Description: aaltay commented on pull request #8457:
URL: https://github.com/apache/beam/pull/8457#issuecomment-639912763


   > There are still failing tests on #11295. @mf2199 - What is the next step 
for this PR?
   
   PIng on this? What is our plan for this PR?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 442088)
Time Spent: 47h 40m  (was: 47.5h)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Assignee: Solomon Duskis
>Priority: P2
>  Time Spent: 47h 40m
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2020-05-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=436142&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-436142
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 21/May/20 18:32
Start Date: 21/May/20 18:32
Worklog Time Spent: 10m 
  Work Description: aaltay commented on pull request #8457:
URL: https://github.com/apache/beam/pull/8457#issuecomment-632270901


   There are still failing tests on https://github.com/apache/beam/pull/11295. 
@mf2199 - What is the next step for this PR?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 436142)
Time Spent: 47.5h  (was: 47h 20m)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Assignee: Solomon Duskis
>Priority: P2
>  Time Spent: 47.5h
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2020-05-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=432272&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-432272
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 08/May/20 22:32
Start Date: 08/May/20 22:32
Worklog Time Spent: 10m 
  Work Description: chamikaramj commented on pull request #8457:
URL: https://github.com/apache/beam/pull/8457#issuecomment-626048359


   Only committers can trigger Jenkins tests. I triggered Python PreCommit and 
PostCommit for the new PR.  Lemme know if tests should be re-triggered or a 
different test suite should be triggered. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 432272)
Time Spent: 47h 20m  (was: 47h 10m)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Assignee: Solomon Duskis
>Priority: Major
>  Time Spent: 47h 20m
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2020-05-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=432271&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-432271
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 08/May/20 22:29
Start Date: 08/May/20 22:29
Worklog Time Spent: 10m 
  Work Description: mf2199 commented on pull request #8457:
URL: https://github.com/apache/beam/pull/8457#issuecomment-626047468


   @chamikaramj  Fixed an error and tried to re-trigger Jenkins with PR 
[#11295](https://github.com/apache/beam/pull/11295/) - still no luck. Maybe 
it's really worth asking around.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 432271)
Time Spent: 47h 10m  (was: 47h)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Assignee: Solomon Duskis
>Priority: Major
>  Time Spent: 47h 10m
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2020-05-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=432206&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-432206
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 08/May/20 17:56
Start Date: 08/May/20 17:56
Worklog Time Spent: 10m 
  Work Description: chamikaramj commented on pull request #8457:
URL: https://github.com/apache/beam/pull/8457#issuecomment-625940125


   +1 for starting a new PR. It's surprising to hear that Jenkins IT trigger 
does not capture your updates. Hopefully you'll not run into this in the new 
PR. If you do prob. worth an email to the dev list to check if someone else has 
run into that.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 432206)
Time Spent: 47h  (was: 46h 50m)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Assignee: Solomon Duskis
>Priority: Major
>  Time Spent: 47h
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2020-04-17 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=424308&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-424308
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 17/Apr/20 17:35
Start Date: 17/Apr/20 17:35
Worklog Time Spent: 10m 
  Work Description: mf2199 commented on issue #8457: [BEAM-3342] Create a 
Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#issuecomment-615374068
 
 
   > Can you please address test failures and conflicts ?
   
   @chamikaramj  For some reason there no longer appear to be any conflicts.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 424308)
Time Spent: 46h 50m  (was: 46h 40m)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Assignee: Solomon Duskis
>Priority: Major
>  Time Spent: 46h 50m
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2020-04-17 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=424305&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-424305
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 17/Apr/20 17:34
Start Date: 17/Apr/20 17:34
Worklog Time Spent: 10m 
  Work Description: mf2199 commented on pull request #8457: [BEAM-3342] 
Create a Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#discussion_r410370076
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigtableio_read_it_test.py
 ##
 @@ -0,0 +1,159 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
 
 Review comment:
   So far it's been tested for tables containing up to 10^10 rows, which must 
be sufficient for most purposes. The read times increase roughly linearly with 
the table size. The total read times for 10^10 rows are just under 3 hours with 
a high degree of consistency. That is under maxed out parallelization for 
'us-central1', or 600 workers total. It seems quite possible to read even 
larger tables, as long as spending weeks, possibly months for such tests is not 
an issue.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 424305)
Time Spent: 46h 40m  (was: 46.5h)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Assignee: Solomon Duskis
>Priority: Major
>  Time Spent: 46h 40m
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2020-04-10 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=420526&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-420526
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 10/Apr/20 21:34
Start Date: 10/Apr/20 21:34
Worklog Time Spent: 10m 
  Work Description: mf2199 commented on pull request #8457: [BEAM-3342] 
Create a Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#discussion_r406953225
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigtableio.py
 ##
 @@ -141,3 +144,123 @@ def expand(self, pvalue):
 | beam.ParDo(_BigTableWriteFn(beam_options['project_id'],
   beam_options['instance_id'],
   beam_options['table_id'])))
+
+
+class _BigtableReadFn(beam.DoFn):
+  def __init__(self, project_id, instance_id, table_id, filter_=None):
+""" A DoFn to parallelize reading from a Bigtable table
+
+:type project_id: str
+:param project_id: The ID of the project used for Bigtable access
+
+:type instance_id: str
+:param instance_id: The ID of the instance that owns the table.
+
+:type table_id: str
+:param table_id: The ID of the table.
+
+:type filter_: :class:`.RowFilter`
+:param filter_: (Optional) The filter to apply to the contents of the
+specified row(s). If unset, reads every column in
+each row.
+"""
+super(self.__class__, self).__init__()
+self._initialize({'project_id': project_id,
+ 'instance_id': instance_id,
+ 'table_id': table_id,
+ 'filter_': filter_})
+
+  def _initialize(self, options):
+""" The defaults initializer, to assist with pickling
+
+:return: None
+"""
+self._options = options
+self._table = None
+self._counter = Metrics.counter(self.__class__, 'Rows Read')
+
+  def __getstate__(self):
+return self._options
+
+  def __setstate__(self, options):
+self._initialize(options)
+
+  def start_bundle(self):
+# from google.cloud.bigtable import Client
+if self._table is None:
+  # noinspection PyAttributeOutsideInit
+  self._table = Client(project=self._options['project_id'])\
+.instance(self._options['instance_id'])\
+.table(self._options['table_id'])
+
+  def process(self, source_bundle):
+_start_key = source_bundle.start_position
+_end_key = source_bundle.stop_position
+for row in self._table.read_rows(_start_key, _end_key):
+  self._counter.inc()
+  yield row
+
+  def display_data(self):
+return {'projectId': DisplayDataItem(self._options['project_id'],
+ label='Bigtable Project Id'),
+'instanceId': DisplayDataItem(self._options['instance_id'],
+  label='Bigtable Instance Id'),
+'tableId': DisplayDataItem(self._options['table_id'],
+   label='Bigtable Table Id')}
+
+
+class ReadFromBigtable(beam.PTransform):
+  def __init__(self, project_id, instance_id, table_id, filter_=None):
+""" A PTransform wrapper for parallel reading rows from s Bigtable table.
+
+:type project_id: str
+:param project_id: The ID of the project used for Bigtable access
+
+:type instance_id: str
+:param instance_id: The ID of the instance that owns the table.
+
+:type table_id: str
+:param table_id: The ID of the table.
+
+:type filter_: :class:`.RowFilter`
+:param filter_: (Optional) The filter to apply to the contents of the
+specified row(s). If unset, reads every column in
+each row. If noe is provided, all rows are read by default.
+"""
+super(self.__class__, self).__init__()
+self._options = {'project_id': project_id,
+ 'instance_id': instance_id,
+ 'table_id': table_id,
+ 'filter_': filter_}
+
+  def __getstate__(self):
+return self._options
+
+  def __setstate__(self, options):
+self._options = options
+
+  def expand(self, pbegin):
+table = Client(project=self._options['project_id'], admin=True) \
+  .instance(instance_id=self._options['instance_id']) \
+  .table(table_id=self._options['table_id'])
+
+keys = list(table.sample_row_keys())
+
+SampleRowKey = namedtuple("SampleRowKey", "row_key offset_bytes")
+keys.insert(0, SampleRowKey(b'', 0))
+
+def chunks():
+  for i in range(1, len(keys)):
+key_1 = keys[i - 1].row_key
+key_2 = keys[i].row_key
+size = keys[i].offset_bytes - keys[i - 1].offset_bytes
+yield iobase.SourceBundle(size, None, key_1, key_2)
+
+return (pbegin
+| 'Bundles' >> beam.Create(iter(ch

[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2020-04-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=417146&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-417146
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 06/Apr/20 21:33
Start Date: 06/Apr/20 21:33
Worklog Time Spent: 10m 
  Work Description: mf2199 commented on pull request #8457: [BEAM-3342] 
Create a Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#discussion_r404401748
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigtableio.py
 ##
 @@ -141,3 +144,123 @@ def expand(self, pvalue):
 | beam.ParDo(_BigTableWriteFn(beam_options['project_id'],
   beam_options['instance_id'],
   beam_options['table_id'])))
+
+
+class _BigtableReadFn(beam.DoFn):
+  def __init__(self, project_id, instance_id, table_id, filter_=None):
+""" A DoFn to parallelize reading from a Bigtable table
+
+:type project_id: str
+:param project_id: The ID of the project used for Bigtable access
+
+:type instance_id: str
+:param instance_id: The ID of the instance that owns the table.
+
+:type table_id: str
+:param table_id: The ID of the table.
+
+:type filter_: :class:`.RowFilter`
+:param filter_: (Optional) The filter to apply to the contents of the
+specified row(s). If unset, reads every column in
+each row.
+"""
+super(self.__class__, self).__init__()
+self._initialize({'project_id': project_id,
+ 'instance_id': instance_id,
+ 'table_id': table_id,
+ 'filter_': filter_})
+
+  def _initialize(self, options):
+""" The defaults initializer, to assist with pickling
+
+:return: None
+"""
+self._options = options
+self._table = None
+self._counter = Metrics.counter(self.__class__, 'Rows Read')
+
+  def __getstate__(self):
+return self._options
+
+  def __setstate__(self, options):
+self._initialize(options)
+
+  def start_bundle(self):
+# from google.cloud.bigtable import Client
+if self._table is None:
+  # noinspection PyAttributeOutsideInit
+  self._table = Client(project=self._options['project_id'])\
+.instance(self._options['instance_id'])\
+.table(self._options['table_id'])
+
+  def process(self, source_bundle):
+_start_key = source_bundle.start_position
+_end_key = source_bundle.stop_position
+for row in self._table.read_rows(_start_key, _end_key):
+  self._counter.inc()
+  yield row
+
+  def display_data(self):
+return {'projectId': DisplayDataItem(self._options['project_id'],
+ label='Bigtable Project Id'),
+'instanceId': DisplayDataItem(self._options['instance_id'],
+  label='Bigtable Instance Id'),
+'tableId': DisplayDataItem(self._options['table_id'],
+   label='Bigtable Table Id')}
+
+
+class ReadFromBigtable(beam.PTransform):
+  def __init__(self, project_id, instance_id, table_id, filter_=None):
+""" A PTransform wrapper for parallel reading rows from s Bigtable table.
+
+:type project_id: str
+:param project_id: The ID of the project used for Bigtable access
+
+:type instance_id: str
+:param instance_id: The ID of the instance that owns the table.
+
+:type table_id: str
+:param table_id: The ID of the table.
+
+:type filter_: :class:`.RowFilter`
+:param filter_: (Optional) The filter to apply to the contents of the
+specified row(s). If unset, reads every column in
+each row. If noe is provided, all rows are read by default.
+"""
+super(self.__class__, self).__init__()
+self._options = {'project_id': project_id,
+ 'instance_id': instance_id,
+ 'table_id': table_id,
+ 'filter_': filter_}
+
+  def __getstate__(self):
+return self._options
+
+  def __setstate__(self, options):
+self._options = options
+
+  def expand(self, pbegin):
+table = Client(project=self._options['project_id'], admin=True) \
+  .instance(instance_id=self._options['instance_id']) \
+  .table(table_id=self._options['table_id'])
+
+keys = list(table.sample_row_keys())
+
+SampleRowKey = namedtuple("SampleRowKey", "row_key offset_bytes")
+keys.insert(0, SampleRowKey(b'', 0))
+
+def chunks():
+  for i in range(1, len(keys)):
+key_1 = keys[i - 1].row_key
+key_2 = keys[i].row_key
+size = keys[i].offset_bytes - keys[i - 1].offset_bytes
+yield iobase.SourceBundle(size, None, key_1, key_2)
 
 Review comment:
   Done.
 
---

[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2020-04-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=417055&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-417055
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 06/Apr/20 19:30
Start Date: 06/Apr/20 19:30
Worklog Time Spent: 10m 
  Work Description: mf2199 commented on pull request #8457: [BEAM-3342] 
Create a Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#discussion_r404336039
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigtableio.py
 ##
 @@ -141,3 +144,123 @@ def expand(self, pvalue):
 | beam.ParDo(_BigTableWriteFn(beam_options['project_id'],
   beam_options['instance_id'],
   beam_options['table_id'])))
+
+
+class _BigtableReadFn(beam.DoFn):
+  def __init__(self, project_id, instance_id, table_id, filter_=None):
+""" A DoFn to parallelize reading from a Bigtable table
+
+:type project_id: str
+:param project_id: The ID of the project used for Bigtable access
+
+:type instance_id: str
+:param instance_id: The ID of the instance that owns the table.
+
+:type table_id: str
+:param table_id: The ID of the table.
+
+:type filter_: :class:`.RowFilter`
+:param filter_: (Optional) The filter to apply to the contents of the
+specified row(s). If unset, reads every column in
+each row.
+"""
+super(self.__class__, self).__init__()
+self._initialize({'project_id': project_id,
+ 'instance_id': instance_id,
+ 'table_id': table_id,
+ 'filter_': filter_})
+
+  def _initialize(self, options):
+""" The defaults initializer, to assist with pickling
+
+:return: None
+"""
+self._options = options
+self._table = None
+self._counter = Metrics.counter(self.__class__, 'Rows Read')
+
+  def __getstate__(self):
+return self._options
+
+  def __setstate__(self, options):
+self._initialize(options)
+
+  def start_bundle(self):
+# from google.cloud.bigtable import Client
+if self._table is None:
+  # noinspection PyAttributeOutsideInit
+  self._table = Client(project=self._options['project_id'])\
+.instance(self._options['instance_id'])\
+.table(self._options['table_id'])
+
+  def process(self, source_bundle):
+_start_key = source_bundle.start_position
+_end_key = source_bundle.stop_position
+for row in self._table.read_rows(_start_key, _end_key):
+  self._counter.inc()
+  yield row
+
+  def display_data(self):
+return {'projectId': DisplayDataItem(self._options['project_id'],
+ label='Bigtable Project Id'),
+'instanceId': DisplayDataItem(self._options['instance_id'],
+  label='Bigtable Instance Id'),
+'tableId': DisplayDataItem(self._options['table_id'],
+   label='Bigtable Table Id')}
+
+
+class ReadFromBigtable(beam.PTransform):
+  def __init__(self, project_id, instance_id, table_id, filter_=None):
+""" A PTransform wrapper for parallel reading rows from s Bigtable table.
+
+:type project_id: str
+:param project_id: The ID of the project used for Bigtable access
+
+:type instance_id: str
+:param instance_id: The ID of the instance that owns the table.
+
+:type table_id: str
+:param table_id: The ID of the table.
+
+:type filter_: :class:`.RowFilter`
+:param filter_: (Optional) The filter to apply to the contents of the
+specified row(s). If unset, reads every column in
+each row. If noe is provided, all rows are read by default.
+"""
+super(self.__class__, self).__init__()
+self._options = {'project_id': project_id,
+ 'instance_id': instance_id,
+ 'table_id': table_id,
+ 'filter_': filter_}
+
+  def __getstate__(self):
+return self._options
+
+  def __setstate__(self, options):
+self._options = options
+
+  def expand(self, pbegin):
+table = Client(project=self._options['project_id'], admin=True) \
+  .instance(instance_id=self._options['instance_id']) \
+  .table(table_id=self._options['table_id'])
+
+keys = list(table.sample_row_keys())
+
+SampleRowKey = namedtuple("SampleRowKey", "row_key offset_bytes")
+keys.insert(0, SampleRowKey(b'', 0))
 
 Review comment:
   A very concise one, but done.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please 

[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2020-04-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=417040&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-417040
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 06/Apr/20 19:05
Start Date: 06/Apr/20 19:05
Worklog Time Spent: 10m 
  Work Description: mf2199 commented on pull request #8457: [BEAM-3342] 
Create a Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#discussion_r404322413
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigtableio.py
 ##
 @@ -141,3 +144,123 @@ def expand(self, pvalue):
 | beam.ParDo(_BigTableWriteFn(beam_options['project_id'],
   beam_options['instance_id'],
   beam_options['table_id'])))
+
+
+class _BigtableReadFn(beam.DoFn):
+  def __init__(self, project_id, instance_id, table_id, filter_=None):
+""" A DoFn to parallelize reading from a Bigtable table
+
+:type project_id: str
+:param project_id: The ID of the project used for Bigtable access
+
+:type instance_id: str
+:param instance_id: The ID of the instance that owns the table.
+
+:type table_id: str
+:param table_id: The ID of the table.
+
+:type filter_: :class:`.RowFilter`
+:param filter_: (Optional) The filter to apply to the contents of the
+specified row(s). If unset, reads every column in
+each row.
+"""
+super(self.__class__, self).__init__()
+self._initialize({'project_id': project_id,
+ 'instance_id': instance_id,
+ 'table_id': table_id,
+ 'filter_': filter_})
+
+  def _initialize(self, options):
+""" The defaults initializer, to assist with pickling
+
+:return: None
+"""
+self._options = options
+self._table = None
+self._counter = Metrics.counter(self.__class__, 'Rows Read')
+
+  def __getstate__(self):
+return self._options
+
+  def __setstate__(self, options):
+self._initialize(options)
+
+  def start_bundle(self):
+# from google.cloud.bigtable import Client
+if self._table is None:
+  # noinspection PyAttributeOutsideInit
+  self._table = Client(project=self._options['project_id'])\
+.instance(self._options['instance_id'])\
+.table(self._options['table_id'])
+
+  def process(self, source_bundle):
+_start_key = source_bundle.start_position
+_end_key = source_bundle.stop_position
+for row in self._table.read_rows(_start_key, _end_key):
+  self._counter.inc()
+  yield row
+
+  def display_data(self):
+return {'projectId': DisplayDataItem(self._options['project_id'],
+ label='Bigtable Project Id'),
+'instanceId': DisplayDataItem(self._options['instance_id'],
+  label='Bigtable Instance Id'),
+'tableId': DisplayDataItem(self._options['table_id'],
+   label='Bigtable Table Id')}
+
+
+class ReadFromBigtable(beam.PTransform):
+  def __init__(self, project_id, instance_id, table_id, filter_=None):
+""" A PTransform wrapper for parallel reading rows from s Bigtable table.
+
+:type project_id: str
+:param project_id: The ID of the project used for Bigtable access
+
+:type instance_id: str
+:param instance_id: The ID of the instance that owns the table.
+
+:type table_id: str
+:param table_id: The ID of the table.
+
+:type filter_: :class:`.RowFilter`
+:param filter_: (Optional) The filter to apply to the contents of the
+specified row(s). If unset, reads every column in
+each row. If noe is provided, all rows are read by default.
+"""
+super(self.__class__, self).__init__()
+self._options = {'project_id': project_id,
+ 'instance_id': instance_id,
+ 'table_id': table_id,
+ 'filter_': filter_}
+
+  def __getstate__(self):
+return self._options
+
+  def __setstate__(self, options):
+self._options = options
+
+  def expand(self, pbegin):
+table = Client(project=self._options['project_id'], admin=True) \
+  .instance(instance_id=self._options['instance_id']) \
+  .table(table_id=self._options['table_id'])
+
+keys = list(table.sample_row_keys())
+
+SampleRowKey = namedtuple("SampleRowKey", "row_key offset_bytes")
+keys.insert(0, SampleRowKey(b'', 0))
+
+def chunks():
+  for i in range(1, len(keys)):
 
 Review comment:
   Yes, indeed. A CBT Table returns at least one sample row key to which we 
unconditionally add another one. Returning an empty `sample_row_key` list may 
be if the table is not valid or something else broke down the line. So for that 
matter w

[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2020-04-02 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=415200&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-415200
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 03/Apr/20 00:18
Start Date: 03/Apr/20 00:18
Worklog Time Spent: 10m 
  Work Description: mf2199 commented on pull request #8457: [BEAM-3342] 
Create a Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#discussion_r402668347
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigtableio_read_it_test.py
 ##
 @@ -0,0 +1,159 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
 Review comment:
   Yeas, you can test it, as long as the extra package name differs from 
`bigtableio`. Otherwise there is a namespace conflict in which Dataflow will 
prefer the previous code version that does not yet have the new functionality. 
See my comments from 24 February.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 415200)
Time Spent: 45h 50m  (was: 45h 40m)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Assignee: Solomon Duskis
>Priority: Major
>  Time Spent: 45h 50m
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2020-04-02 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=415190&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-415190
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 03/Apr/20 00:06
Start Date: 03/Apr/20 00:06
Worklog Time Spent: 10m 
  Work Description: mf2199 commented on pull request #8457: [BEAM-3342] 
Create a Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#discussion_r402664846
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigtableio.py
 ##
 @@ -141,3 +144,123 @@ def expand(self, pvalue):
 | beam.ParDo(_BigTableWriteFn(beam_options['project_id'],
   beam_options['instance_id'],
   beam_options['table_id'])))
+
+
+class _BigtableReadFn(beam.DoFn):
+  def __init__(self, project_id, instance_id, table_id, filter_=None):
+""" A DoFn to parallelize reading from a Bigtable table
+
+:type project_id: str
+:param project_id: The ID of the project used for Bigtable access
+
+:type instance_id: str
+:param instance_id: The ID of the instance that owns the table.
+
+:type table_id: str
+:param table_id: The ID of the table.
+
+:type filter_: :class:`.RowFilter`
+:param filter_: (Optional) The filter to apply to the contents of the
+specified row(s). If unset, reads every column in
+each row.
+"""
+super(self.__class__, self).__init__()
+self._initialize({'project_id': project_id,
+ 'instance_id': instance_id,
+ 'table_id': table_id,
+ 'filter_': filter_})
+
+  def _initialize(self, options):
+""" The defaults initializer, to assist with pickling
+
+:return: None
+"""
+self._options = options
+self._table = None
+self._counter = Metrics.counter(self.__class__, 'Rows Read')
+
+  def __getstate__(self):
+return self._options
+
+  def __setstate__(self, options):
+self._initialize(options)
+
+  def start_bundle(self):
+# from google.cloud.bigtable import Client
+if self._table is None:
+  # noinspection PyAttributeOutsideInit
+  self._table = Client(project=self._options['project_id'])\
+.instance(self._options['instance_id'])\
+.table(self._options['table_id'])
+
+  def process(self, source_bundle):
+_start_key = source_bundle.start_position
+_end_key = source_bundle.stop_position
+for row in self._table.read_rows(_start_key, _end_key):
+  self._counter.inc()
+  yield row
+
+  def display_data(self):
+return {'projectId': DisplayDataItem(self._options['project_id'],
+ label='Bigtable Project Id'),
+'instanceId': DisplayDataItem(self._options['instance_id'],
+  label='Bigtable Instance Id'),
+'tableId': DisplayDataItem(self._options['table_id'],
+   label='Bigtable Table Id')}
+
+
+class ReadFromBigtable(beam.PTransform):
 
 Review comment:
   Done.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 415190)
Time Spent: 45h 40m  (was: 45.5h)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Assignee: Solomon Duskis
>Priority: Major
>  Time Spent: 45h 40m
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2020-03-23 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=408175&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-408175
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 23/Mar/20 19:15
Start Date: 23/Mar/20 19:15
Worklog Time Spent: 10m 
  Work Description: chamikaramj commented on pull request #8457: 
[BEAM-3342] Create a Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#discussion_r396691673
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigtableio.py
 ##
 @@ -141,3 +144,123 @@ def expand(self, pvalue):
 | beam.ParDo(_BigTableWriteFn(beam_options['project_id'],
   beam_options['instance_id'],
   beam_options['table_id'])))
+
+
+class _BigtableReadFn(beam.DoFn):
+  def __init__(self, project_id, instance_id, table_id, filter_=None):
+""" A DoFn to parallelize reading from a Bigtable table
+
+:type project_id: str
+:param project_id: The ID of the project used for Bigtable access
+
+:type instance_id: str
+:param instance_id: The ID of the instance that owns the table.
+
+:type table_id: str
+:param table_id: The ID of the table.
+
+:type filter_: :class:`.RowFilter`
+:param filter_: (Optional) The filter to apply to the contents of the
+specified row(s). If unset, reads every column in
+each row.
+"""
+super(self.__class__, self).__init__()
+self._initialize({'project_id': project_id,
+ 'instance_id': instance_id,
+ 'table_id': table_id,
+ 'filter_': filter_})
+
+  def _initialize(self, options):
+""" The defaults initializer, to assist with pickling
+
+:return: None
+"""
+self._options = options
+self._table = None
+self._counter = Metrics.counter(self.__class__, 'Rows Read')
+
+  def __getstate__(self):
+return self._options
+
+  def __setstate__(self, options):
+self._initialize(options)
+
+  def start_bundle(self):
+# from google.cloud.bigtable import Client
+if self._table is None:
+  # noinspection PyAttributeOutsideInit
+  self._table = Client(project=self._options['project_id'])\
+.instance(self._options['instance_id'])\
+.table(self._options['table_id'])
+
+  def process(self, source_bundle):
+_start_key = source_bundle.start_position
+_end_key = source_bundle.stop_position
+for row in self._table.read_rows(_start_key, _end_key):
+  self._counter.inc()
+  yield row
+
+  def display_data(self):
+return {'projectId': DisplayDataItem(self._options['project_id'],
+ label='Bigtable Project Id'),
+'instanceId': DisplayDataItem(self._options['instance_id'],
+  label='Bigtable Instance Id'),
+'tableId': DisplayDataItem(self._options['table_id'],
+   label='Bigtable Table Id')}
+
+
+class ReadFromBigtable(beam.PTransform):
+  def __init__(self, project_id, instance_id, table_id, filter_=None):
+""" A PTransform wrapper for parallel reading rows from s Bigtable table.
+
+:type project_id: str
+:param project_id: The ID of the project used for Bigtable access
+
+:type instance_id: str
+:param instance_id: The ID of the instance that owns the table.
+
+:type table_id: str
+:param table_id: The ID of the table.
+
+:type filter_: :class:`.RowFilter`
+:param filter_: (Optional) The filter to apply to the contents of the
+specified row(s). If unset, reads every column in
+each row. If noe is provided, all rows are read by default.
+"""
+super(self.__class__, self).__init__()
+self._options = {'project_id': project_id,
+ 'instance_id': instance_id,
+ 'table_id': table_id,
+ 'filter_': filter_}
+
+  def __getstate__(self):
+return self._options
+
+  def __setstate__(self, options):
+self._options = options
+
+  def expand(self, pbegin):
+table = Client(project=self._options['project_id'], admin=True) \
+  .instance(instance_id=self._options['instance_id']) \
+  .table(table_id=self._options['table_id'])
+
+keys = list(table.sample_row_keys())
+
+SampleRowKey = namedtuple("SampleRowKey", "row_key offset_bytes")
+keys.insert(0, SampleRowKey(b'', 0))
+
+def chunks():
+  for i in range(1, len(keys)):
 
 Review comment:
   Do we expect to always get at least two keys here ? If so add an assertion 
before this statement so that we don't fall through trivially.
 

This is an automated message fr

[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2020-03-23 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=408176&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-408176
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 23/Mar/20 19:15
Start Date: 23/Mar/20 19:15
Worklog Time Spent: 10m 
  Work Description: chamikaramj commented on pull request #8457: 
[BEAM-3342] Create a Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#discussion_r396694342
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigtableio.py
 ##
 @@ -141,3 +144,123 @@ def expand(self, pvalue):
 | beam.ParDo(_BigTableWriteFn(beam_options['project_id'],
   beam_options['instance_id'],
   beam_options['table_id'])))
+
+
+class _BigtableReadFn(beam.DoFn):
+  def __init__(self, project_id, instance_id, table_id, filter_=None):
+""" A DoFn to parallelize reading from a Bigtable table
+
+:type project_id: str
+:param project_id: The ID of the project used for Bigtable access
+
+:type instance_id: str
+:param instance_id: The ID of the instance that owns the table.
+
+:type table_id: str
+:param table_id: The ID of the table.
+
+:type filter_: :class:`.RowFilter`
+:param filter_: (Optional) The filter to apply to the contents of the
+specified row(s). If unset, reads every column in
+each row.
+"""
+super(self.__class__, self).__init__()
+self._initialize({'project_id': project_id,
+ 'instance_id': instance_id,
+ 'table_id': table_id,
+ 'filter_': filter_})
+
+  def _initialize(self, options):
+""" The defaults initializer, to assist with pickling
+
+:return: None
+"""
+self._options = options
+self._table = None
+self._counter = Metrics.counter(self.__class__, 'Rows Read')
+
+  def __getstate__(self):
+return self._options
+
+  def __setstate__(self, options):
+self._initialize(options)
+
+  def start_bundle(self):
+# from google.cloud.bigtable import Client
+if self._table is None:
+  # noinspection PyAttributeOutsideInit
+  self._table = Client(project=self._options['project_id'])\
+.instance(self._options['instance_id'])\
+.table(self._options['table_id'])
+
+  def process(self, source_bundle):
+_start_key = source_bundle.start_position
+_end_key = source_bundle.stop_position
+for row in self._table.read_rows(_start_key, _end_key):
+  self._counter.inc()
+  yield row
+
+  def display_data(self):
+return {'projectId': DisplayDataItem(self._options['project_id'],
+ label='Bigtable Project Id'),
+'instanceId': DisplayDataItem(self._options['instance_id'],
+  label='Bigtable Instance Id'),
+'tableId': DisplayDataItem(self._options['table_id'],
+   label='Bigtable Table Id')}
+
+
+class ReadFromBigtable(beam.PTransform):
+  def __init__(self, project_id, instance_id, table_id, filter_=None):
+""" A PTransform wrapper for parallel reading rows from s Bigtable table.
+
+:type project_id: str
+:param project_id: The ID of the project used for Bigtable access
+
+:type instance_id: str
+:param instance_id: The ID of the instance that owns the table.
+
+:type table_id: str
+:param table_id: The ID of the table.
+
+:type filter_: :class:`.RowFilter`
+:param filter_: (Optional) The filter to apply to the contents of the
+specified row(s). If unset, reads every column in
+each row. If noe is provided, all rows are read by default.
+"""
+super(self.__class__, self).__init__()
+self._options = {'project_id': project_id,
+ 'instance_id': instance_id,
+ 'table_id': table_id,
+ 'filter_': filter_}
+
+  def __getstate__(self):
+return self._options
+
+  def __setstate__(self, options):
+self._options = options
+
+  def expand(self, pbegin):
+table = Client(project=self._options['project_id'], admin=True) \
+  .instance(instance_id=self._options['instance_id']) \
+  .table(table_id=self._options['table_id'])
+
+keys = list(table.sample_row_keys())
+
+SampleRowKey = namedtuple("SampleRowKey", "row_key offset_bytes")
+keys.insert(0, SampleRowKey(b'', 0))
+
+def chunks():
+  for i in range(1, len(keys)):
+key_1 = keys[i - 1].row_key
+key_2 = keys[i].row_key
+size = keys[i].offset_bytes - keys[i - 1].offset_bytes
+yield iobase.SourceBundle(size, None, key_1, key_2)
+
+return (pbegin
+| 'Bundles' >> beam.Create(it

[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2020-03-23 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=408171&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-408171
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 23/Mar/20 19:15
Start Date: 23/Mar/20 19:15
Worklog Time Spent: 10m 
  Work Description: chamikaramj commented on pull request #8457: 
[BEAM-3342] Create a Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#discussion_r396659933
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigtableio.py
 ##
 @@ -141,3 +144,123 @@ def expand(self, pvalue):
 | beam.ParDo(_BigTableWriteFn(beam_options['project_id'],
   beam_options['instance_id'],
   beam_options['table_id'])))
+
+
+class _BigtableReadFn(beam.DoFn):
+  def __init__(self, project_id, instance_id, table_id, filter_=None):
+""" A DoFn to parallelize reading from a Bigtable table
+
+:type project_id: str
+:param project_id: The ID of the project used for Bigtable access
+
+:type instance_id: str
+:param instance_id: The ID of the instance that owns the table.
+
+:type table_id: str
+:param table_id: The ID of the table.
+
+:type filter_: :class:`.RowFilter`
+:param filter_: (Optional) The filter to apply to the contents of the
+specified row(s). If unset, reads every column in
+each row.
+"""
+super(self.__class__, self).__init__()
+self._initialize({'project_id': project_id,
+ 'instance_id': instance_id,
+ 'table_id': table_id,
+ 'filter_': filter_})
+
+  def _initialize(self, options):
+""" The defaults initializer, to assist with pickling
+
+:return: None
+"""
+self._options = options
+self._table = None
+self._counter = Metrics.counter(self.__class__, 'Rows Read')
+
+  def __getstate__(self):
+return self._options
+
+  def __setstate__(self, options):
+self._initialize(options)
+
+  def start_bundle(self):
+# from google.cloud.bigtable import Client
+if self._table is None:
+  # noinspection PyAttributeOutsideInit
+  self._table = Client(project=self._options['project_id'])\
+.instance(self._options['instance_id'])\
+.table(self._options['table_id'])
+
+  def process(self, source_bundle):
+_start_key = source_bundle.start_position
+_end_key = source_bundle.stop_position
+for row in self._table.read_rows(_start_key, _end_key):
+  self._counter.inc()
+  yield row
+
+  def display_data(self):
+return {'projectId': DisplayDataItem(self._options['project_id'],
+ label='Bigtable Project Id'),
+'instanceId': DisplayDataItem(self._options['instance_id'],
+  label='Bigtable Instance Id'),
+'tableId': DisplayDataItem(self._options['table_id'],
+   label='Bigtable Table Id')}
+
+
+class ReadFromBigtable(beam.PTransform):
 
 Review comment:
   Please add docs and annotations to denote that this module and PTransforms 
(both source and sink) are experimental similar to following.
   
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py#L52
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 408171)
Time Spent: 44h 50m  (was: 44h 40m)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Assignee: Solomon Duskis
>Priority: Major
>  Time Spent: 44h 50m
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2020-03-23 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=408172&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-408172
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 23/Mar/20 19:15
Start Date: 23/Mar/20 19:15
Worklog Time Spent: 10m 
  Work Description: chamikaramj commented on pull request #8457: 
[BEAM-3342] Create a Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#discussion_r396690605
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigtableio.py
 ##
 @@ -141,3 +144,123 @@ def expand(self, pvalue):
 | beam.ParDo(_BigTableWriteFn(beam_options['project_id'],
   beam_options['instance_id'],
   beam_options['table_id'])))
+
+
+class _BigtableReadFn(beam.DoFn):
+  def __init__(self, project_id, instance_id, table_id, filter_=None):
+""" A DoFn to parallelize reading from a Bigtable table
+
+:type project_id: str
+:param project_id: The ID of the project used for Bigtable access
+
+:type instance_id: str
+:param instance_id: The ID of the instance that owns the table.
+
+:type table_id: str
+:param table_id: The ID of the table.
+
+:type filter_: :class:`.RowFilter`
+:param filter_: (Optional) The filter to apply to the contents of the
+specified row(s). If unset, reads every column in
+each row.
+"""
+super(self.__class__, self).__init__()
+self._initialize({'project_id': project_id,
+ 'instance_id': instance_id,
+ 'table_id': table_id,
+ 'filter_': filter_})
+
+  def _initialize(self, options):
+""" The defaults initializer, to assist with pickling
+
+:return: None
+"""
+self._options = options
+self._table = None
+self._counter = Metrics.counter(self.__class__, 'Rows Read')
+
+  def __getstate__(self):
+return self._options
+
+  def __setstate__(self, options):
+self._initialize(options)
+
+  def start_bundle(self):
+# from google.cloud.bigtable import Client
+if self._table is None:
+  # noinspection PyAttributeOutsideInit
+  self._table = Client(project=self._options['project_id'])\
+.instance(self._options['instance_id'])\
+.table(self._options['table_id'])
+
+  def process(self, source_bundle):
+_start_key = source_bundle.start_position
+_end_key = source_bundle.stop_position
+for row in self._table.read_rows(_start_key, _end_key):
+  self._counter.inc()
+  yield row
+
+  def display_data(self):
+return {'projectId': DisplayDataItem(self._options['project_id'],
+ label='Bigtable Project Id'),
+'instanceId': DisplayDataItem(self._options['instance_id'],
+  label='Bigtable Instance Id'),
+'tableId': DisplayDataItem(self._options['table_id'],
+   label='Bigtable Table Id')}
+
+
+class ReadFromBigtable(beam.PTransform):
+  def __init__(self, project_id, instance_id, table_id, filter_=None):
+""" A PTransform wrapper for parallel reading rows from s Bigtable table.
+
+:type project_id: str
+:param project_id: The ID of the project used for Bigtable access
+
+:type instance_id: str
+:param instance_id: The ID of the instance that owns the table.
+
+:type table_id: str
+:param table_id: The ID of the table.
+
+:type filter_: :class:`.RowFilter`
+:param filter_: (Optional) The filter to apply to the contents of the
+specified row(s). If unset, reads every column in
+each row. If noe is provided, all rows are read by default.
+"""
+super(self.__class__, self).__init__()
+self._options = {'project_id': project_id,
+ 'instance_id': instance_id,
+ 'table_id': table_id,
+ 'filter_': filter_}
+
+  def __getstate__(self):
+return self._options
+
+  def __setstate__(self, options):
+self._options = options
+
+  def expand(self, pbegin):
+table = Client(project=self._options['project_id'], admin=True) \
+  .instance(instance_id=self._options['instance_id']) \
+  .table(table_id=self._options['table_id'])
+
+keys = list(table.sample_row_keys())
+
+SampleRowKey = namedtuple("SampleRowKey", "row_key offset_bytes")
+keys.insert(0, SampleRowKey(b'', 0))
 
 Review comment:
   Please add a comment to explain why this is needed.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries 

[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2020-03-23 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=408173&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-408173
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 23/Mar/20 19:15
Start Date: 23/Mar/20 19:15
Worklog Time Spent: 10m 
  Work Description: chamikaramj commented on pull request #8457: 
[BEAM-3342] Create a Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#discussion_r396693125
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigtableio.py
 ##
 @@ -141,3 +144,123 @@ def expand(self, pvalue):
 | beam.ParDo(_BigTableWriteFn(beam_options['project_id'],
   beam_options['instance_id'],
   beam_options['table_id'])))
+
+
+class _BigtableReadFn(beam.DoFn):
+  def __init__(self, project_id, instance_id, table_id, filter_=None):
+""" A DoFn to parallelize reading from a Bigtable table
+
+:type project_id: str
+:param project_id: The ID of the project used for Bigtable access
+
+:type instance_id: str
+:param instance_id: The ID of the instance that owns the table.
+
+:type table_id: str
+:param table_id: The ID of the table.
+
+:type filter_: :class:`.RowFilter`
+:param filter_: (Optional) The filter to apply to the contents of the
+specified row(s). If unset, reads every column in
+each row.
+"""
+super(self.__class__, self).__init__()
+self._initialize({'project_id': project_id,
+ 'instance_id': instance_id,
+ 'table_id': table_id,
+ 'filter_': filter_})
+
+  def _initialize(self, options):
+""" The defaults initializer, to assist with pickling
+
+:return: None
+"""
+self._options = options
+self._table = None
+self._counter = Metrics.counter(self.__class__, 'Rows Read')
+
+  def __getstate__(self):
+return self._options
+
+  def __setstate__(self, options):
+self._initialize(options)
+
+  def start_bundle(self):
+# from google.cloud.bigtable import Client
+if self._table is None:
+  # noinspection PyAttributeOutsideInit
+  self._table = Client(project=self._options['project_id'])\
+.instance(self._options['instance_id'])\
+.table(self._options['table_id'])
+
+  def process(self, source_bundle):
+_start_key = source_bundle.start_position
+_end_key = source_bundle.stop_position
+for row in self._table.read_rows(_start_key, _end_key):
+  self._counter.inc()
+  yield row
+
+  def display_data(self):
+return {'projectId': DisplayDataItem(self._options['project_id'],
+ label='Bigtable Project Id'),
+'instanceId': DisplayDataItem(self._options['instance_id'],
+  label='Bigtable Instance Id'),
+'tableId': DisplayDataItem(self._options['table_id'],
+   label='Bigtable Table Id')}
+
+
+class ReadFromBigtable(beam.PTransform):
+  def __init__(self, project_id, instance_id, table_id, filter_=None):
+""" A PTransform wrapper for parallel reading rows from s Bigtable table.
+
+:type project_id: str
+:param project_id: The ID of the project used for Bigtable access
+
+:type instance_id: str
+:param instance_id: The ID of the instance that owns the table.
+
+:type table_id: str
+:param table_id: The ID of the table.
+
+:type filter_: :class:`.RowFilter`
+:param filter_: (Optional) The filter to apply to the contents of the
+specified row(s). If unset, reads every column in
+each row. If noe is provided, all rows are read by default.
+"""
+super(self.__class__, self).__init__()
+self._options = {'project_id': project_id,
+ 'instance_id': instance_id,
+ 'table_id': table_id,
+ 'filter_': filter_}
+
+  def __getstate__(self):
+return self._options
+
+  def __setstate__(self, options):
+self._options = options
+
+  def expand(self, pbegin):
+table = Client(project=self._options['project_id'], admin=True) \
+  .instance(instance_id=self._options['instance_id']) \
+  .table(table_id=self._options['table_id'])
+
+keys = list(table.sample_row_keys())
+
+SampleRowKey = namedtuple("SampleRowKey", "row_key offset_bytes")
+keys.insert(0, SampleRowKey(b'', 0))
+
+def chunks():
+  for i in range(1, len(keys)):
+key_1 = keys[i - 1].row_key
+key_2 = keys[i].row_key
+size = keys[i].offset_bytes - keys[i - 1].offset_bytes
+yield iobase.SourceBundle(size, None, key_1, key_2)
 
 Review comment:
   You don't need to use SourceTestBundle if 

[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2020-03-23 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=408174&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-408174
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 23/Mar/20 19:15
Start Date: 23/Mar/20 19:15
Worklog Time Spent: 10m 
  Work Description: chamikaramj commented on pull request #8457: 
[BEAM-3342] Create a Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#discussion_r396695008
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigtableio_read_it_test.py
 ##
 @@ -0,0 +1,159 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
 Review comment:
   Is this test working now ? (If so I can try triggering it from the PR).
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 408174)
Time Spent: 45h 20m  (was: 45h 10m)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Assignee: Solomon Duskis
>Priority: Major
>  Time Spent: 45h 20m
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2020-03-23 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=408177&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-408177
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 23/Mar/20 19:15
Start Date: 23/Mar/20 19:15
Worklog Time Spent: 10m 
  Work Description: chamikaramj commented on pull request #8457: 
[BEAM-3342] Create a Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#discussion_r396695591
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigtableio_read_it_test.py
 ##
 @@ -0,0 +1,159 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
 
 Review comment:
   Can you try triggering this for a large dataset to make sure that there's no 
severe perf regression ?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 408177)
Time Spent: 45.5h  (was: 45h 20m)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Assignee: Solomon Duskis
>Priority: Major
>  Time Spent: 45.5h
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2020-03-23 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=408111&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-408111
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 23/Mar/20 18:10
Start Date: 23/Mar/20 18:10
Worklog Time Spent: 10m 
  Work Description: chamikaramj commented on issue #8457: [BEAM-3342] 
Create a Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#issuecomment-602768456
 
 
   Can you please address test failures and conflicts ?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 408111)
Time Spent: 44h 40m  (was: 44.5h)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Assignee: Solomon Duskis
>Priority: Major
>  Time Spent: 44h 40m
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2020-03-18 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=405476&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-405476
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 18/Mar/20 15:02
Start Date: 18/Mar/20 15:02
Worklog Time Spent: 10m 
  Work Description: chamikaramj commented on issue #8457: [BEAM-3342] 
Create a Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#issuecomment-600674160
 
 
   Sorry about the dalay. Will take a look this week.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 405476)
Time Spent: 44.5h  (was: 44h 20m)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Assignee: Solomon Duskis
>Priority: Major
>  Time Spent: 44.5h
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2020-02-26 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=393839&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393839
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 26/Feb/20 23:28
Start Date: 26/Feb/20 23:28
Worklog Time Spent: 10m 
  Work Description: mf2199 commented on issue #8457: [BEAM-3342] Create a 
Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#issuecomment-590610828
 
 
   @chamikaramj PTAL. The long standing issue now seems to be resolved. The 
reason was the namespace conflict between the existing `bigtableio.py` file and 
the extra package necessary for running the integration test. As it turns out, 
the two must have different names, otherwise Dataflow discards the package and 
uses the existing file, which of course does not have the newly added classes 
and hence the `AttributeError: 'module' object has no attribute...` error. 
Apparently this was not the case when this PR was originally created, so now we 
also have a caveat:
   
   - Until the code is merged, the only way to run the test is to change the 
package name in the import directive, `from bigtableio import 
ReadFromBigtable`, to something different, and use the external tarball package 
named accordingly. Upon merge, using an extra package will no longer be 
necessary and the test should run as-is.
   
   This was confirmed by running a sequence of nearly identical tests 
back-to-back, the instructions to which I can provide separately.
   
   In an attempt to reduce the code changes to a bare minimum, the write part 
of the test has been discarded, as it would test an already accepted code 
anyway.
   
   Finally, I'd also suggest closing this PR and opening a new one, so to make 
things cleaner. Let me know if this is a viable option.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 393839)
Time Spent: 44h 20m  (was: 44h 10m)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Assignee: Solomon Duskis
>Priority: Major
>  Time Spent: 44h 20m
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2020-02-24 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=392205&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-392205
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 25/Feb/20 00:02
Start Date: 25/Feb/20 00:02
Worklog Time Spent: 10m 
  Work Description: mf2199 commented on issue #8457: [BEAM-3342] Create a 
Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#issuecomment-590610828
 
 
   @chamikaramj PTAL. The long standing issue now seems to be resolved. The 
reason was the namespace conflict between the existing `bigtableio.py` file and 
the extra package necessary for running the integration test. As it turns out, 
the two must have different names, otherwise Dataflow discards the package and 
uses the existing file, which of course does not have the newly added classes 
and hence the `AttributeError: 'module' object has no attribute...` error. 
Apparently this was not the case when this PR was originally created, so now we 
also have a caveat:
   
   - Until the code is merged, the only way to run the test is to change the 
package name in the import directive, `from bigtableio import 
ReadFromBigtable`, to something different, and use the external tarball package 
named accordingly. Upon merge, using an extra package will no longer be 
necessary and the test should run as-is.
   
   This was confirmed by running a sequence of nearly identical tests 
back-to-back, the instructions to which I can provide separately.
   
   In an attempt to reduce the code changes to a bare minimum, the write part 
of the test has been discarded, as it would test an already accepted code 
anyway.
   
   Finally, I'd also suggest closing this PR and open a new one, so to make 
things cleaner. Let me know if this is a viable option.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 392205)
Time Spent: 44h 10m  (was: 44h)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Assignee: Solomon Duskis
>Priority: Major
>  Time Spent: 44h 10m
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2020-02-10 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=384626&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-384626
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 10/Feb/20 18:16
Start Date: 10/Feb/20 18:16
Worklog Time Spent: 10m 
  Work Description: aaltay commented on issue #8457: [BEAM-3342] Create a 
Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#issuecomment-584258741
 
 
   Could you also please add a new feature note to 
https://github.com/apache/beam/blob/master/CHANGES.md before this is merged?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 384626)
Time Spent: 44h  (was: 43h 50m)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Assignee: Solomon Duskis
>Priority: Major
>  Time Spent: 44h
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2019-12-23 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=362824&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-362824
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 24/Dec/19 00:40
Start Date: 24/Dec/19 00:40
Worklog Time Spent: 10m 
  Work Description: mf2199 commented on pull request #8457: [BEAM-3342] 
Create a Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#discussion_r361033718
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigtableio.py
 ##
 @@ -122,22 +126,145 @@ class WriteToBigTable(beam.PTransform):
   A PTransform that write a list of `DirectRow` into the Bigtable Table
 
   """
-  def __init__(self, project_id=None, instance_id=None,
-   table_id=None):
+  def __init__(self, project_id=None, instance_id=None, table_id=None):
 """ The PTransform to access the Bigtable Write connector
 Args:
   project_id(str): GCP Project of to write the Rows
   instance_id(str): GCP Instance to write the Rows
   table_id(str): GCP Table to write the `DirectRows`
 """
 super(WriteToBigTable, self).__init__()
-self.beam_options = {'project_id': project_id,
- 'instance_id': instance_id,
- 'table_id': table_id}
+self._beam_options = {'project_id': project_id,
+  'instance_id': instance_id,
+  'table_id': table_id}
 
   def expand(self, pvalue):
-beam_options = self.beam_options
+beam_options = self._beam_options
 return (pvalue
 | beam.ParDo(_BigTableWriteFn(beam_options['project_id'],
   beam_options['instance_id'],
   beam_options['table_id'])))
+
+
+class _BigtableReadFn(beam.DoFn):
+  """ Creates the connector that can read rows for Beam pipeline
+
+  Args:
+project_id(str): GCP Project ID
+instance_id(str): GCP Instance ID
+table_id(str): GCP Table ID
+
+  """
+
+  def __init__(self, project_id, instance_id, table_id, filter_=b''):
+""" Constructor of the Read connector of Bigtable
+
+Args:
+  project_id: [str] GCP Project of to write the Rows
+  instance_id: [str] GCP Instance to write the Rows
+  table_id: [str] GCP Table to write the `DirectRows`
+  filter_: [RowFilter] Filter to apply to columns in a row.
+"""
+super(self.__class__, self).__init__()
+self._initialize({'project_id': project_id,
+  'instance_id': instance_id,
+  'table_id': table_id,
+  'filter_': filter_})
+
+  def __getstate__(self):
+return self._beam_options
+
+  def __setstate__(self, options):
+self._initialize(options)
+
+  def _initialize(self, options):
+self._beam_options = options
+self.table = None
+self.sample_row_keys = None
+self.row_count = Metrics.counter(self.__class__.__name__, 'Rows read')
+
+  def start_bundle(self):
+if self.table is None:
+  self.table = Client(project=self._beam_options['project_id'])\
+.instance(self._beam_options['instance_id'])\
+.table(self._beam_options['table_id'])
+
+  def process(self, element, **kwargs):
+for row in self.table.read_rows(start_key=element.start_position,
+end_key=element.end_position,
+filter_=self._beam_options['filter_']):
+  self.row_count.inc()
+  yield row
+
+  def display_data(self):
+return {'projectId': DisplayDataItem(self._beam_options['project_id'],
+ label='Bigtable Project Id'),
+'instanceId': DisplayDataItem(self._beam_options['instance_id'],
+  label='Bigtable Instance Id'),
+'tableId': DisplayDataItem(self._beam_options['table_id'],
+   label='Bigtable Table Id'),
+'filter_': DisplayDataItem(str(self._beam_options['filter_']),
+   label='Bigtable Filter')
+   }
+
+
+class ReadFromBigTable(beam.PTransform):
+  def __init__(self, project_id, instance_id, table_id, filter_=b''):
+""" The PTransform to access the Bigtable Read connector
+
+Args:
+  project_id: [str] GCP Project of to read the Rows
+  instance_id): [str] GCP Instance to read the Rows
+  table_id): [str] GCP Table to read the Rows
+  filter_: [RowFilter] Filter to apply to columns in a row.
+"""
+super(self.__class__, self).__init__()
+self._beam_options = {'project_id': project_id,
+  'instance_id': instance_id,
+  'table_id': table_id,
+  'filter_': filter_}
+
+  def _

[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2019-12-23 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=362811&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-362811
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 24/Dec/19 00:20
Start Date: 24/Dec/19 00:20
Worklog Time Spent: 10m 
  Work Description: chamikaramj commented on pull request #8457: 
[BEAM-3342] Create a Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#discussion_r361031589
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigtableio.py
 ##
 @@ -122,22 +126,145 @@ class WriteToBigTable(beam.PTransform):
   A PTransform that write a list of `DirectRow` into the Bigtable Table
 
   """
-  def __init__(self, project_id=None, instance_id=None,
-   table_id=None):
+  def __init__(self, project_id=None, instance_id=None, table_id=None):
 """ The PTransform to access the Bigtable Write connector
 Args:
   project_id(str): GCP Project of to write the Rows
   instance_id(str): GCP Instance to write the Rows
   table_id(str): GCP Table to write the `DirectRows`
 """
 super(WriteToBigTable, self).__init__()
-self.beam_options = {'project_id': project_id,
- 'instance_id': instance_id,
- 'table_id': table_id}
+self._beam_options = {'project_id': project_id,
+  'instance_id': instance_id,
+  'table_id': table_id}
 
   def expand(self, pvalue):
-beam_options = self.beam_options
+beam_options = self._beam_options
 return (pvalue
 | beam.ParDo(_BigTableWriteFn(beam_options['project_id'],
   beam_options['instance_id'],
   beam_options['table_id'])))
+
+
+class _BigtableReadFn(beam.DoFn):
+  """ Creates the connector that can read rows for Beam pipeline
+
+  Args:
+project_id(str): GCP Project ID
+instance_id(str): GCP Instance ID
+table_id(str): GCP Table ID
+
+  """
+
+  def __init__(self, project_id, instance_id, table_id, filter_=b''):
+""" Constructor of the Read connector of Bigtable
+
+Args:
+  project_id: [str] GCP Project of to write the Rows
+  instance_id: [str] GCP Instance to write the Rows
+  table_id: [str] GCP Table to write the `DirectRows`
+  filter_: [RowFilter] Filter to apply to columns in a row.
+"""
+super(self.__class__, self).__init__()
+self._initialize({'project_id': project_id,
+  'instance_id': instance_id,
+  'table_id': table_id,
+  'filter_': filter_})
+
+  def __getstate__(self):
+return self._beam_options
+
+  def __setstate__(self, options):
+self._initialize(options)
+
+  def _initialize(self, options):
+self._beam_options = options
+self.table = None
+self.sample_row_keys = None
+self.row_count = Metrics.counter(self.__class__.__name__, 'Rows read')
+
+  def start_bundle(self):
+if self.table is None:
+  self.table = Client(project=self._beam_options['project_id'])\
+.instance(self._beam_options['instance_id'])\
+.table(self._beam_options['table_id'])
+
+  def process(self, element, **kwargs):
+for row in self.table.read_rows(start_key=element.start_position,
+end_key=element.end_position,
+filter_=self._beam_options['filter_']):
+  self.row_count.inc()
+  yield row
+
+  def display_data(self):
+return {'projectId': DisplayDataItem(self._beam_options['project_id'],
+ label='Bigtable Project Id'),
+'instanceId': DisplayDataItem(self._beam_options['instance_id'],
+  label='Bigtable Instance Id'),
+'tableId': DisplayDataItem(self._beam_options['table_id'],
+   label='Bigtable Table Id'),
+'filter_': DisplayDataItem(str(self._beam_options['filter_']),
+   label='Bigtable Filter')
+   }
+
+
+class ReadFromBigTable(beam.PTransform):
+  def __init__(self, project_id, instance_id, table_id, filter_=b''):
+""" The PTransform to access the Bigtable Read connector
+
+Args:
+  project_id: [str] GCP Project of to read the Rows
+  instance_id): [str] GCP Instance to read the Rows
+  table_id): [str] GCP Table to read the Rows
+  filter_: [RowFilter] Filter to apply to columns in a row.
+"""
+super(self.__class__, self).__init__()
+self._beam_options = {'project_id': project_id,
+  'instance_id': instance_id,
+  'table_id': table_id,
+  'filter_': filter_}
+
+  

[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2019-12-11 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=358293&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-358293
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 12/Dec/19 05:07
Start Date: 12/Dec/19 05:07
Worklog Time Spent: 10m 
  Work Description: mf2199 commented on pull request #8457: [BEAM-3342] 
Create a Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#discussion_r356960074
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigtableio.py
 ##
 @@ -122,22 +126,145 @@ class WriteToBigTable(beam.PTransform):
   A PTransform that write a list of `DirectRow` into the Bigtable Table
 
   """
-  def __init__(self, project_id=None, instance_id=None,
-   table_id=None):
+  def __init__(self, project_id=None, instance_id=None, table_id=None):
 """ The PTransform to access the Bigtable Write connector
 Args:
   project_id(str): GCP Project of to write the Rows
   instance_id(str): GCP Instance to write the Rows
   table_id(str): GCP Table to write the `DirectRows`
 """
 super(WriteToBigTable, self).__init__()
-self.beam_options = {'project_id': project_id,
- 'instance_id': instance_id,
- 'table_id': table_id}
+self._beam_options = {'project_id': project_id,
+  'instance_id': instance_id,
+  'table_id': table_id}
 
   def expand(self, pvalue):
-beam_options = self.beam_options
+beam_options = self._beam_options
 return (pvalue
 | beam.ParDo(_BigTableWriteFn(beam_options['project_id'],
   beam_options['instance_id'],
   beam_options['table_id'])))
+
+
+class _BigtableReadFn(beam.DoFn):
+  """ Creates the connector that can read rows for Beam pipeline
+
+  Args:
+project_id(str): GCP Project ID
+instance_id(str): GCP Instance ID
+table_id(str): GCP Table ID
+
+  """
+
+  def __init__(self, project_id, instance_id, table_id, filter_=b''):
+""" Constructor of the Read connector of Bigtable
+
+Args:
+  project_id: [str] GCP Project of to write the Rows
+  instance_id: [str] GCP Instance to write the Rows
+  table_id: [str] GCP Table to write the `DirectRows`
+  filter_: [RowFilter] Filter to apply to columns in a row.
+"""
+super(self.__class__, self).__init__()
+self._initialize({'project_id': project_id,
+  'instance_id': instance_id,
+  'table_id': table_id,
+  'filter_': filter_})
+
+  def __getstate__(self):
+return self._beam_options
+
+  def __setstate__(self, options):
+self._initialize(options)
+
+  def _initialize(self, options):
+self._beam_options = options
+self.table = None
+self.sample_row_keys = None
+self.row_count = Metrics.counter(self.__class__.__name__, 'Rows read')
+
+  def start_bundle(self):
+if self.table is None:
+  self.table = Client(project=self._beam_options['project_id'])\
+.instance(self._beam_options['instance_id'])\
+.table(self._beam_options['table_id'])
+
+  def process(self, element, **kwargs):
+for row in self.table.read_rows(start_key=element.start_position,
+end_key=element.end_position,
+filter_=self._beam_options['filter_']):
+  self.row_count.inc()
+  yield row
+
+  def display_data(self):
+return {'projectId': DisplayDataItem(self._beam_options['project_id'],
+ label='Bigtable Project Id'),
+'instanceId': DisplayDataItem(self._beam_options['instance_id'],
+  label='Bigtable Instance Id'),
+'tableId': DisplayDataItem(self._beam_options['table_id'],
+   label='Bigtable Table Id'),
+'filter_': DisplayDataItem(str(self._beam_options['filter_']),
+   label='Bigtable Filter')
+   }
+
+
+class ReadFromBigTable(beam.PTransform):
+  def __init__(self, project_id, instance_id, table_id, filter_=b''):
+""" The PTransform to access the Bigtable Read connector
+
+Args:
+  project_id: [str] GCP Project of to read the Rows
+  instance_id): [str] GCP Instance to read the Rows
+  table_id): [str] GCP Table to read the Rows
+  filter_: [RowFilter] Filter to apply to columns in a row.
+"""
+super(self.__class__, self).__init__()
+self._beam_options = {'project_id': project_id,
+  'instance_id': instance_id,
+  'table_id': table_id,
+  'filter_': filter_}
+
+  def _

[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2019-12-11 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=357925&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-357925
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 11/Dec/19 15:58
Start Date: 11/Dec/19 15:58
Worklog Time Spent: 10m 
  Work Description: chamikaramj commented on pull request #8457: 
[BEAM-3342] Create a Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#discussion_r356682146
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigtableio.py
 ##
 @@ -122,22 +126,145 @@ class WriteToBigTable(beam.PTransform):
   A PTransform that write a list of `DirectRow` into the Bigtable Table
 
   """
-  def __init__(self, project_id=None, instance_id=None,
-   table_id=None):
+  def __init__(self, project_id=None, instance_id=None, table_id=None):
 """ The PTransform to access the Bigtable Write connector
 Args:
   project_id(str): GCP Project of to write the Rows
   instance_id(str): GCP Instance to write the Rows
   table_id(str): GCP Table to write the `DirectRows`
 """
 super(WriteToBigTable, self).__init__()
-self.beam_options = {'project_id': project_id,
- 'instance_id': instance_id,
- 'table_id': table_id}
+self._beam_options = {'project_id': project_id,
+  'instance_id': instance_id,
+  'table_id': table_id}
 
   def expand(self, pvalue):
-beam_options = self.beam_options
+beam_options = self._beam_options
 return (pvalue
 | beam.ParDo(_BigTableWriteFn(beam_options['project_id'],
   beam_options['instance_id'],
   beam_options['table_id'])))
+
+
+class _BigtableReadFn(beam.DoFn):
+  """ Creates the connector that can read rows for Beam pipeline
+
+  Args:
+project_id(str): GCP Project ID
+instance_id(str): GCP Instance ID
+table_id(str): GCP Table ID
+
+  """
+
+  def __init__(self, project_id, instance_id, table_id, filter_=b''):
+""" Constructor of the Read connector of Bigtable
+
+Args:
+  project_id: [str] GCP Project of to write the Rows
+  instance_id: [str] GCP Instance to write the Rows
+  table_id: [str] GCP Table to write the `DirectRows`
+  filter_: [RowFilter] Filter to apply to columns in a row.
+"""
+super(self.__class__, self).__init__()
+self._initialize({'project_id': project_id,
+  'instance_id': instance_id,
+  'table_id': table_id,
+  'filter_': filter_})
+
+  def __getstate__(self):
+return self._beam_options
+
+  def __setstate__(self, options):
+self._initialize(options)
+
+  def _initialize(self, options):
+self._beam_options = options
+self.table = None
+self.sample_row_keys = None
+self.row_count = Metrics.counter(self.__class__.__name__, 'Rows read')
+
+  def start_bundle(self):
+if self.table is None:
+  self.table = Client(project=self._beam_options['project_id'])\
+.instance(self._beam_options['instance_id'])\
+.table(self._beam_options['table_id'])
+
+  def process(self, element, **kwargs):
+for row in self.table.read_rows(start_key=element.start_position,
+end_key=element.end_position,
+filter_=self._beam_options['filter_']):
+  self.row_count.inc()
+  yield row
+
+  def display_data(self):
+return {'projectId': DisplayDataItem(self._beam_options['project_id'],
+ label='Bigtable Project Id'),
+'instanceId': DisplayDataItem(self._beam_options['instance_id'],
+  label='Bigtable Instance Id'),
+'tableId': DisplayDataItem(self._beam_options['table_id'],
+   label='Bigtable Table Id'),
+'filter_': DisplayDataItem(str(self._beam_options['filter_']),
+   label='Bigtable Filter')
+   }
+
+
+class ReadFromBigTable(beam.PTransform):
+  def __init__(self, project_id, instance_id, table_id, filter_=b''):
+""" The PTransform to access the Bigtable Read connector
+
+Args:
+  project_id: [str] GCP Project of to read the Rows
+  instance_id): [str] GCP Instance to read the Rows
+  table_id): [str] GCP Table to read the Rows
+  filter_: [RowFilter] Filter to apply to columns in a row.
+"""
+super(self.__class__, self).__init__()
+self._beam_options = {'project_id': project_id,
+  'instance_id': instance_id,
+  'table_id': table_id,
+  'filter_': filter_}
+
+  

[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2019-12-11 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=357923&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-357923
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 11/Dec/19 15:58
Start Date: 11/Dec/19 15:58
Worklog Time Spent: 10m 
  Work Description: chamikaramj commented on pull request #8457: 
[BEAM-3342] Create a Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#discussion_r356682146
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigtableio.py
 ##
 @@ -122,22 +126,145 @@ class WriteToBigTable(beam.PTransform):
   A PTransform that write a list of `DirectRow` into the Bigtable Table
 
   """
-  def __init__(self, project_id=None, instance_id=None,
-   table_id=None):
+  def __init__(self, project_id=None, instance_id=None, table_id=None):
 """ The PTransform to access the Bigtable Write connector
 Args:
   project_id(str): GCP Project of to write the Rows
   instance_id(str): GCP Instance to write the Rows
   table_id(str): GCP Table to write the `DirectRows`
 """
 super(WriteToBigTable, self).__init__()
-self.beam_options = {'project_id': project_id,
- 'instance_id': instance_id,
- 'table_id': table_id}
+self._beam_options = {'project_id': project_id,
+  'instance_id': instance_id,
+  'table_id': table_id}
 
   def expand(self, pvalue):
-beam_options = self.beam_options
+beam_options = self._beam_options
 return (pvalue
 | beam.ParDo(_BigTableWriteFn(beam_options['project_id'],
   beam_options['instance_id'],
   beam_options['table_id'])))
+
+
+class _BigtableReadFn(beam.DoFn):
+  """ Creates the connector that can read rows for Beam pipeline
+
+  Args:
+project_id(str): GCP Project ID
+instance_id(str): GCP Instance ID
+table_id(str): GCP Table ID
+
+  """
+
+  def __init__(self, project_id, instance_id, table_id, filter_=b''):
+""" Constructor of the Read connector of Bigtable
+
+Args:
+  project_id: [str] GCP Project of to write the Rows
+  instance_id: [str] GCP Instance to write the Rows
+  table_id: [str] GCP Table to write the `DirectRows`
+  filter_: [RowFilter] Filter to apply to columns in a row.
+"""
+super(self.__class__, self).__init__()
+self._initialize({'project_id': project_id,
+  'instance_id': instance_id,
+  'table_id': table_id,
+  'filter_': filter_})
+
+  def __getstate__(self):
+return self._beam_options
+
+  def __setstate__(self, options):
+self._initialize(options)
+
+  def _initialize(self, options):
+self._beam_options = options
+self.table = None
+self.sample_row_keys = None
+self.row_count = Metrics.counter(self.__class__.__name__, 'Rows read')
+
+  def start_bundle(self):
+if self.table is None:
+  self.table = Client(project=self._beam_options['project_id'])\
+.instance(self._beam_options['instance_id'])\
+.table(self._beam_options['table_id'])
+
+  def process(self, element, **kwargs):
+for row in self.table.read_rows(start_key=element.start_position,
+end_key=element.end_position,
+filter_=self._beam_options['filter_']):
+  self.row_count.inc()
+  yield row
+
+  def display_data(self):
+return {'projectId': DisplayDataItem(self._beam_options['project_id'],
+ label='Bigtable Project Id'),
+'instanceId': DisplayDataItem(self._beam_options['instance_id'],
+  label='Bigtable Instance Id'),
+'tableId': DisplayDataItem(self._beam_options['table_id'],
+   label='Bigtable Table Id'),
+'filter_': DisplayDataItem(str(self._beam_options['filter_']),
+   label='Bigtable Filter')
+   }
+
+
+class ReadFromBigTable(beam.PTransform):
+  def __init__(self, project_id, instance_id, table_id, filter_=b''):
+""" The PTransform to access the Bigtable Read connector
+
+Args:
+  project_id: [str] GCP Project of to read the Rows
+  instance_id): [str] GCP Instance to read the Rows
+  table_id): [str] GCP Table to read the Rows
+  filter_: [RowFilter] Filter to apply to columns in a row.
+"""
+super(self.__class__, self).__init__()
+self._beam_options = {'project_id': project_id,
+  'instance_id': instance_id,
+  'table_id': table_id,
+  'filter_': filter_}
+
+  

[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2019-12-11 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=357908&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-357908
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 11/Dec/19 15:31
Start Date: 11/Dec/19 15:31
Worklog Time Spent: 10m 
  Work Description: chamikaramj commented on issue #8457: [BEAM-3342] 
Create a Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#issuecomment-564597987
 
 
   @mf2199 can you please provide steps for me to reproduce the issue ?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 357908)
Time Spent: 43h  (was: 42h 50m)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Assignee: Solomon Duskis
>Priority: Major
>  Time Spent: 43h
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2019-12-10 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=357612&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-357612
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 11/Dec/19 04:48
Start Date: 11/Dec/19 04:48
Worklog Time Spent: 10m 
  Work Description: mf2199 commented on issue #8457: [BEAM-3342] Create a 
Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#issuecomment-564376367
 
 
   The [Beam-1251](https://issues.apache.org/jira/browse/BEAM-1251) suggests a 
Py3 support issue, so back to Python 2.  Rewrote and re-tested the connector 
"from scratch". Everything works fine when all the code is contained in a 
single file. After packaging, the code still works with `direct` runner but 
fails with `dataflow` - with the same `AttributeError`. Followed the Dataflow 
[docs](https://beam.apache.org/documentation/runners/dataflow/), 
[guidelines](https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#Autotuning)
 and 
[FAQ](https://cloud.google.com/dataflow/docs/resources/faq#how-do-i-handle-nameerro),
 as well as some 
[Beam](https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/)
 and other documentation - yet no viable solution. The fact that the code works 
both, as standalone (with both, `Dataflow` and `direct` runners) and packaged 
(with `direct`) points out to some environment issue rather than the code 
itself. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 357612)
Time Spent: 42h 50m  (was: 42h 40m)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Assignee: Solomon Duskis
>Priority: Major
>  Time Spent: 42h 50m
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2019-12-10 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=357610&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-357610
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 11/Dec/19 04:43
Start Date: 11/Dec/19 04:43
Worklog Time Spent: 10m 
  Work Description: mf2199 commented on issue #8457: [BEAM-3342] Create a 
Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#issuecomment-564376367
 
 
   The [Beam-1251](https://issues.apache.org/jira/browse/BEAM-1251) suggests a 
Py3 support issue, so back to Python 2.  Rewrote the connector "from scratch". 
Everything works fine when all the code is contained in a single file. After 
packaging, the code still works with `direct` runner but fails with `dataflow` 
- with the same `AttributeError`. Followed the Dataflow 
[docs](https://beam.apache.org/documentation/runners/dataflow/), 
[guidelines](https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#Autotuning)
 and 
[FAQ](https://cloud.google.com/dataflow/docs/resources/faq#how-do-i-handle-nameerro),
 as well as some 
[Beam](https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/)
 and other documentation - yet no viable solution. The fact that the code works 
both, as standalone (with both, `Dataflow` and `direct` runners) and packaged 
(with `direct`) points out to some environment issue rather than the code 
itself. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 357610)
Time Spent: 42h 40m  (was: 42.5h)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Assignee: Solomon Duskis
>Priority: Major
>  Time Spent: 42h 40m
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2019-12-10 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=357609&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-357609
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 11/Dec/19 04:38
Start Date: 11/Dec/19 04:38
Worklog Time Spent: 10m 
  Work Description: mf2199 commented on issue #8457: [BEAM-3342] Create a 
Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#issuecomment-564376367
 
 
   The [Beam-1251](https://issues.apache.org/jira/browse/BEAM-1251) suggests a 
Py3 support issue, so back to Python 2.  Rewrote the connector "from scratch". 
Everything works fine when all the code is contained in a single file. After 
packaging, the code still works with `direct` runner but fails with `dataflow` 
- with the same `AttributeError`. Followed the Dataflow 
[docs](https://beam.apache.org/documentation/runners/dataflow/), 
[guidelines](https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#Autotuning)
 and 
[FAQ](https://cloud.google.com/dataflow/docs/resources/faq#how-do-i-handle-nameerro),
 as well as some 
[Beam](https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/)
 and other documentation - yet no viable solution. The fact that the code works 
both, as standalone (with both, `Dataflow` and `direct` runners) and packaged 
(with `direct`) points out to some environment issue rather than the code 
itself. This also raises a question of whether whoever wrote the `write` part 
of the connector had not completed the `read` part for a reason...
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 357609)
Time Spent: 42.5h  (was: 42h 20m)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Assignee: Solomon Duskis
>Priority: Major
>  Time Spent: 42.5h
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2019-12-02 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=352443&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-352443
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 03/Dec/19 05:11
Start Date: 03/Dec/19 05:11
Worklog Time Spent: 10m 
  Work Description: tvalentyn commented on issue #8457: [BEAM-3342] Create 
a Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#issuecomment-561001706
 
 
   @mf2199 Re AttributeError on Py3:
   Please take a look at https://issues.apache.org/jira/browse/BEAM-1251, first 
message, last bulletpoint.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 352443)
Time Spent: 42h 20m  (was: 42h 10m)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Assignee: Solomon Duskis
>Priority: Major
>  Time Spent: 42h 20m
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2019-12-02 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=352413&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-352413
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 03/Dec/19 02:53
Start Date: 03/Dec/19 02:53
Worklog Time Spent: 10m 
  Work Description: mf2199 commented on issue #8457: [BEAM-3342] Create a 
Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#issuecomment-560975166
 
 
   @chamikaramj  I spoke to Kristen who showed the logs to someone from 
Dataflow team and relayed their response. They have not seen errors like that 
before, hence asked to repeat the test with Python 3 - did that, and the errors 
are the same. 
   
   I've also discovered something new, when adding a new private class 
['_BTReadFn' for the matter] to `bigtableio.py`. The updated package 
[`bigtableio-0.3.125.tar.gz`] is rebuilt, reinstalled locally, and the uploaded 
copy in the staging folder shows proper contents. But the pickler now throws an 
exception:
   ```
   AttributeError: Can't get attribute '_BTReadFn' on 
   ```
   This too hasn't happed before but definitely looks like another environment 
issue and impedes any further testing until solved. I'll keep digging, but if 
you guys have any suggestions, please advise.
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 352413)
Time Spent: 42h 10m  (was: 42h)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Assignee: Solomon Duskis
>Priority: Major
>  Time Spent: 42h 10m
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2019-12-02 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=352412&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-352412
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 03/Dec/19 02:53
Start Date: 03/Dec/19 02:53
Worklog Time Spent: 10m 
  Work Description: mf2199 commented on issue #8457: [BEAM-3342] Create a 
Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#issuecomment-560975166
 
 
   I spoke to Kristen who showed the logs to someone from Dataflow team and 
relayed their response. They have not seen errors like that before, hence asked 
to repeat the test with Python 3 - did that, and the errors are the same. 
   
   I've also discovered something new, when adding a new private class 
['_BTReadFn' for the matter] to `bigtableio.py`. The updated package 
[`bigtableio-0.3.125.tar.gz`] is rebuilt, reinstalled locally, and the uploaded 
copy in the staging folder shows proper contents. But the pickler now throws an 
exception:
   ```
   AttributeError: Can't get attribute '_BTReadFn' on 
   ```
   This too hasn't happed before but definitely looks like another environment 
issue and impedes any further testing until solved. I'll keep digging, but if 
you guys have any suggestions, please advise.
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 352412)
Time Spent: 42h  (was: 41h 50m)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Assignee: Solomon Duskis
>Priority: Major
>  Time Spent: 42h
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2019-12-02 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=352294&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-352294
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 02/Dec/19 23:18
Start Date: 02/Dec/19 23:18
Worklog Time Spent: 10m 
  Work Description: chamikaramj commented on issue #8457: [BEAM-3342] 
Create a Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#issuecomment-560871416
 
 
   Thanks for the update. Can you clarify who in Dataflow team did you discuss 
with ? What are the next steps here ?
   
   @tvalentyn might be able to help if you ran into Python3 specific issues.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 352294)
Time Spent: 41h 50m  (was: 41h 40m)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Assignee: Solomon Duskis
>Priority: Major
>  Time Spent: 41h 50m
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2019-11-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=348236&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-348236
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 22/Nov/19 18:11
Start Date: 22/Nov/19 18:11
Worklog Time Spent: 10m 
  Work Description: mf2199 commented on issue #8457: [BEAM-3342] Create a 
Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#issuecomment-557635542
 
 
   As per discussion with Dataflow team, further testing is warranted under 
Python 3 environment - to be done shortly.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 348236)
Time Spent: 41h 40m  (was: 41.5h)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Assignee: Solomon Duskis
>Priority: Major
>  Time Spent: 41h 40m
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2019-11-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=348234&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-348234
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 22/Nov/19 18:04
Start Date: 22/Nov/19 18:04
Worklog Time Spent: 10m 
  Work Description: chamikaramj commented on issue #8457: [BEAM-3342] 
Create a Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#issuecomment-557633005
 
 
   Please update the status. It's good if we can get this in by the end of the 
year.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 348234)
Time Spent: 41.5h  (was: 41h 20m)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Assignee: Solomon Duskis
>Priority: Major
>  Time Spent: 41.5h
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2019-11-12 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=342046&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-342046
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 12/Nov/19 17:07
Start Date: 12/Nov/19 17:07
Worklog Time Spent: 10m 
  Work Description: pabloem commented on issue #8457: [BEAM-3342] Create a 
Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#issuecomment-552989505
 
 
   Hi @drubinstein @mf2199  - are there any updates on this?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 342046)
Time Spent: 41h 20m  (was: 41h 10m)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Assignee: Solomon Duskis
>Priority: Major
>  Time Spent: 41h 20m
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2019-10-17 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=329731&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-329731
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 17/Oct/19 10:17
Start Date: 17/Oct/19 10:17
Worklog Time Spent: 10m 
  Work Description: mf2199 commented on issue #8457: [BEAM-3342] Create a 
Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#issuecomment-541821885
 
 
   Hi Chamikara,
   
   Just forwarded you some details. It's close to 9pm now here in
   Saint-Petersburg, I’ll get back to it tomorrow.
   
   Max.
   
   
   
   
   Đ¿Đ½, 14 Đ¾ĐºÑ‚. 2019 Đ³., 20:30 Chamikara Jayalath :
   
   > Any updates here ?
   >
   > Thanks.
   >
   > —
   > You are receiving this because you were mentioned.
   > Reply to this email directly, view it on GitHub
   > 
,
   > or unsubscribe
   > 

   > .
   >
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 329731)
Time Spent: 41h 10m  (was: 41h)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Assignee: Solomon Duskis
>Priority: Major
>  Time Spent: 41h 10m
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2019-10-14 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=328016&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-328016
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 14/Oct/19 17:49
Start Date: 14/Oct/19 17:49
Worklog Time Spent: 10m 
  Work Description: mf2199 commented on issue #8457: [BEAM-3342] Create a 
Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#issuecomment-541821885
 
 
   Hi Chamikara,
   
   Just forwarded you some details. It's close to 9pm now here in
   Saint-Petersburg, I’ll get back to it tomorrow.
   
   Max.
   
   
   
   
   Đ¿Đ½, 14 Đ¾ĐºÑ‚. 2019 Đ³., 20:30 Chamikara Jayalath :
   
   > Any updates here ?
   >
   > Thanks.
   >
   > —
   > You are receiving this because you were mentioned.
   > Reply to this email directly, view it on GitHub
   > 
,
   > or unsubscribe
   > 

   > .
   >
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 328016)
Time Spent: 41h  (was: 40h 50m)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Assignee: Solomon Duskis
>Priority: Major
>  Time Spent: 41h
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2019-10-14 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=327994&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-327994
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 14/Oct/19 17:30
Start Date: 14/Oct/19 17:30
Worklog Time Spent: 10m 
  Work Description: chamikaramj commented on issue #8457: [BEAM-3342] 
Create a Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#issuecomment-541811674
 
 
   Any updates here ?
   
   Thanks.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 327994)
Time Spent: 40h 50m  (was: 40h 40m)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Assignee: Solomon Duskis
>Priority: Major
>  Time Spent: 40h 50m
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2019-09-26 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=319032&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-319032
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 26/Sep/19 15:14
Start Date: 26/Sep/19 15:14
Worklog Time Spent: 10m 
  Work Description: chamikaramj commented on pull request #8457: 
[BEAM-3342] Create a Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#discussion_r327778002
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigtableio.py
 ##
 @@ -122,22 +126,145 @@ class WriteToBigTable(beam.PTransform):
   A PTransform that write a list of `DirectRow` into the Bigtable Table
 
   """
-  def __init__(self, project_id=None, instance_id=None,
-   table_id=None):
+  def __init__(self, project_id=None, instance_id=None, table_id=None):
 """ The PTransform to access the Bigtable Write connector
 Args:
   project_id(str): GCP Project of to write the Rows
   instance_id(str): GCP Instance to write the Rows
   table_id(str): GCP Table to write the `DirectRows`
 """
 super(WriteToBigTable, self).__init__()
-self.beam_options = {'project_id': project_id,
- 'instance_id': instance_id,
- 'table_id': table_id}
+self._beam_options = {'project_id': project_id,
+  'instance_id': instance_id,
+  'table_id': table_id}
 
   def expand(self, pvalue):
-beam_options = self.beam_options
+beam_options = self._beam_options
 return (pvalue
 | beam.ParDo(_BigTableWriteFn(beam_options['project_id'],
   beam_options['instance_id'],
   beam_options['table_id'])))
+
+
+class _BigtableReadFn(beam.DoFn):
+  """ Creates the connector that can read rows for Beam pipeline
+
+  Args:
+project_id(str): GCP Project ID
+instance_id(str): GCP Instance ID
+table_id(str): GCP Table ID
+
+  """
+
+  def __init__(self, project_id, instance_id, table_id, filter_=b''):
+""" Constructor of the Read connector of Bigtable
+
+Args:
+  project_id: [str] GCP Project of to write the Rows
+  instance_id: [str] GCP Instance to write the Rows
+  table_id: [str] GCP Table to write the `DirectRows`
+  filter_: [RowFilter] Filter to apply to columns in a row.
+"""
+super(self.__class__, self).__init__()
+self._initialize({'project_id': project_id,
+  'instance_id': instance_id,
+  'table_id': table_id,
+  'filter_': filter_})
+
+  def __getstate__(self):
+return self._beam_options
+
+  def __setstate__(self, options):
+self._initialize(options)
+
+  def _initialize(self, options):
+self._beam_options = options
+self.table = None
+self.sample_row_keys = None
+self.row_count = Metrics.counter(self.__class__.__name__, 'Rows read')
+
+  def start_bundle(self):
+if self.table is None:
+  self.table = Client(project=self._beam_options['project_id'])\
 
 Review comment:
   nit: PEP 8 recommends using parenthesis for formatting instead of 
backslashes.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 319032)
Time Spent: 40h 40m  (was: 40.5h)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Assignee: Solomon Duskis
>Priority: Major
>  Time Spent: 40h 40m
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2019-09-24 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=317709&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-317709
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 24/Sep/19 17:47
Start Date: 24/Sep/19 17:47
Worklog Time Spent: 10m 
  Work Description: drubinstein commented on pull request #8457: 
[BEAM-3342] Create a Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#discussion_r327749840
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigtableio.py
 ##
 @@ -122,22 +126,145 @@ class WriteToBigTable(beam.PTransform):
   A PTransform that write a list of `DirectRow` into the Bigtable Table
 
   """
-  def __init__(self, project_id=None, instance_id=None,
-   table_id=None):
+  def __init__(self, project_id=None, instance_id=None, table_id=None):
 """ The PTransform to access the Bigtable Write connector
 Args:
   project_id(str): GCP Project of to write the Rows
   instance_id(str): GCP Instance to write the Rows
   table_id(str): GCP Table to write the `DirectRows`
 """
 super(WriteToBigTable, self).__init__()
-self.beam_options = {'project_id': project_id,
- 'instance_id': instance_id,
- 'table_id': table_id}
+self._beam_options = {'project_id': project_id,
+  'instance_id': instance_id,
+  'table_id': table_id}
 
   def expand(self, pvalue):
-beam_options = self.beam_options
+beam_options = self._beam_options
 return (pvalue
 | beam.ParDo(_BigTableWriteFn(beam_options['project_id'],
   beam_options['instance_id'],
   beam_options['table_id'])))
+
+
+class _BigtableReadFn(beam.DoFn):
+  """ Creates the connector that can read rows for Beam pipeline
+
+  Args:
+project_id(str): GCP Project ID
+instance_id(str): GCP Instance ID
+table_id(str): GCP Table ID
+
+  """
+
+  def __init__(self, project_id, instance_id, table_id, filter_=b''):
+""" Constructor of the Read connector of Bigtable
+
+Args:
+  project_id: [str] GCP Project of to write the Rows
+  instance_id: [str] GCP Instance to write the Rows
+  table_id: [str] GCP Table to write the `DirectRows`
+  filter_: [RowFilter] Filter to apply to columns in a row.
+"""
+super(self.__class__, self).__init__()
+self._initialize({'project_id': project_id,
+  'instance_id': instance_id,
+  'table_id': table_id,
+  'filter_': filter_})
+
+  def __getstate__(self):
+return self._beam_options
+
+  def __setstate__(self, options):
+self._initialize(options)
+
+  def _initialize(self, options):
+self._beam_options = options
+self.table = None
+self.sample_row_keys = None
+self.row_count = Metrics.counter(self.__class__.__name__, 'Rows read')
+
+  def start_bundle(self):
+if self.table is None:
+  self.table = Client(project=self._beam_options['project_id'])\
+.instance(self._beam_options['instance_id'])\
+.table(self._beam_options['table_id'])
+
+  def process(self, element, **kwargs):
+for row in self.table.read_rows(start_key=element.start_position,
+end_key=element.end_position,
+filter_=self._beam_options['filter_']):
+  self.row_count.inc()
+  yield row
+
+  def display_data(self):
+return {'projectId': DisplayDataItem(self._beam_options['project_id'],
+ label='Bigtable Project Id'),
+'instanceId': DisplayDataItem(self._beam_options['instance_id'],
+  label='Bigtable Instance Id'),
+'tableId': DisplayDataItem(self._beam_options['table_id'],
+   label='Bigtable Table Id'),
+'filter_': DisplayDataItem(str(self._beam_options['filter_']),
+   label='Bigtable Filter')
+   }
+
+
+class ReadFromBigTable(beam.PTransform):
+  def __init__(self, project_id, instance_id, table_id, filter_=b''):
+""" The PTransform to access the Bigtable Read connector
+
+Args:
+  project_id: [str] GCP Project of to read the Rows
+  instance_id): [str] GCP Instance to read the Rows
+  table_id): [str] GCP Table to read the Rows
+  filter_: [RowFilter] Filter to apply to columns in a row.
+"""
+super(self.__class__, self).__init__()
+self._beam_options = {'project_id': project_id,
+  'instance_id': instance_id,
+  'table_id': table_id,
+  'filter_': filter_}
+
+  

[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2019-09-24 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=317708&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-317708
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 24/Sep/19 17:46
Start Date: 24/Sep/19 17:46
Worklog Time Spent: 10m 
  Work Description: drubinstein commented on pull request #8457: 
[BEAM-3342] Create a Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#discussion_r327749283
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigtableio.py
 ##
 @@ -122,22 +126,145 @@ class WriteToBigTable(beam.PTransform):
   A PTransform that write a list of `DirectRow` into the Bigtable Table
 
   """
-  def __init__(self, project_id=None, instance_id=None,
-   table_id=None):
+  def __init__(self, project_id=None, instance_id=None, table_id=None):
 """ The PTransform to access the Bigtable Write connector
 Args:
   project_id(str): GCP Project of to write the Rows
   instance_id(str): GCP Instance to write the Rows
   table_id(str): GCP Table to write the `DirectRows`
 """
 super(WriteToBigTable, self).__init__()
-self.beam_options = {'project_id': project_id,
- 'instance_id': instance_id,
- 'table_id': table_id}
+self._beam_options = {'project_id': project_id,
+  'instance_id': instance_id,
+  'table_id': table_id}
 
   def expand(self, pvalue):
-beam_options = self.beam_options
+beam_options = self._beam_options
 return (pvalue
 | beam.ParDo(_BigTableWriteFn(beam_options['project_id'],
   beam_options['instance_id'],
   beam_options['table_id'])))
+
+
+class _BigtableReadFn(beam.DoFn):
+  """ Creates the connector that can read rows for Beam pipeline
+
+  Args:
+project_id(str): GCP Project ID
+instance_id(str): GCP Instance ID
+table_id(str): GCP Table ID
+
+  """
+
+  def __init__(self, project_id, instance_id, table_id, filter_=b''):
+""" Constructor of the Read connector of Bigtable
+
+Args:
+  project_id: [str] GCP Project of to write the Rows
+  instance_id: [str] GCP Instance to write the Rows
+  table_id: [str] GCP Table to write the `DirectRows`
+  filter_: [RowFilter] Filter to apply to columns in a row.
+"""
+super(self.__class__, self).__init__()
+self._initialize({'project_id': project_id,
+  'instance_id': instance_id,
+  'table_id': table_id,
+  'filter_': filter_})
+
+  def __getstate__(self):
+return self._beam_options
+
+  def __setstate__(self, options):
+self._initialize(options)
+
+  def _initialize(self, options):
+self._beam_options = options
+self.table = None
+self.sample_row_keys = None
+self.row_count = Metrics.counter(self.__class__.__name__, 'Rows read')
+
+  def start_bundle(self):
+if self.table is None:
+  self.table = Client(project=self._beam_options['project_id'])\
+.instance(self._beam_options['instance_id'])\
+.table(self._beam_options['table_id'])
+
+  def process(self, element, **kwargs):
+for row in self.table.read_rows(start_key=element.start_position,
+end_key=element.end_position,
+filter_=self._beam_options['filter_']):
+  self.row_count.inc()
+  yield row
+
+  def display_data(self):
+return {'projectId': DisplayDataItem(self._beam_options['project_id'],
+ label='Bigtable Project Id'),
+'instanceId': DisplayDataItem(self._beam_options['instance_id'],
+  label='Bigtable Instance Id'),
+'tableId': DisplayDataItem(self._beam_options['table_id'],
+   label='Bigtable Table Id'),
+'filter_': DisplayDataItem(str(self._beam_options['filter_']),
+   label='Bigtable Filter')
+   }
+
+
+class ReadFromBigTable(beam.PTransform):
+  def __init__(self, project_id, instance_id, table_id, filter_=b''):
+""" The PTransform to access the Bigtable Read connector
+
+Args:
+  project_id: [str] GCP Project of to read the Rows
+  instance_id): [str] GCP Instance to read the Rows
+  table_id): [str] GCP Table to read the Rows
+  filter_: [RowFilter] Filter to apply to columns in a row.
+"""
+super(self.__class__, self).__init__()
+self._beam_options = {'project_id': project_id,
+  'instance_id': instance_id,
+  'table_id': table_id,
+  'filter_': filter_}
+
+  

[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2019-09-24 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=317469&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-317469
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 24/Sep/19 13:02
Start Date: 24/Sep/19 13:02
Worklog Time Spent: 10m 
  Work Description: sduskis commented on pull request #8457: [BEAM-3342] 
Create a Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#discussion_r327600980
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigtableio.py
 ##
 @@ -122,22 +126,145 @@ class WriteToBigTable(beam.PTransform):
   A PTransform that write a list of `DirectRow` into the Bigtable Table
 
   """
-  def __init__(self, project_id=None, instance_id=None,
-   table_id=None):
+  def __init__(self, project_id=None, instance_id=None, table_id=None):
 """ The PTransform to access the Bigtable Write connector
 Args:
   project_id(str): GCP Project of to write the Rows
   instance_id(str): GCP Instance to write the Rows
   table_id(str): GCP Table to write the `DirectRows`
 """
 super(WriteToBigTable, self).__init__()
-self.beam_options = {'project_id': project_id,
- 'instance_id': instance_id,
- 'table_id': table_id}
+self._beam_options = {'project_id': project_id,
+  'instance_id': instance_id,
+  'table_id': table_id}
 
   def expand(self, pvalue):
-beam_options = self.beam_options
+beam_options = self._beam_options
 return (pvalue
 | beam.ParDo(_BigTableWriteFn(beam_options['project_id'],
   beam_options['instance_id'],
   beam_options['table_id'])))
+
+
+class _BigtableReadFn(beam.DoFn):
+  """ Creates the connector that can read rows for Beam pipeline
+
+  Args:
+project_id(str): GCP Project ID
+instance_id(str): GCP Instance ID
+table_id(str): GCP Table ID
+
+  """
+
+  def __init__(self, project_id, instance_id, table_id, filter_=b''):
+""" Constructor of the Read connector of Bigtable
+
+Args:
+  project_id: [str] GCP Project of to write the Rows
+  instance_id: [str] GCP Instance to write the Rows
+  table_id: [str] GCP Table to write the `DirectRows`
+  filter_: [RowFilter] Filter to apply to columns in a row.
+"""
+super(self.__class__, self).__init__()
+self._initialize({'project_id': project_id,
+  'instance_id': instance_id,
+  'table_id': table_id,
+  'filter_': filter_})
+
+  def __getstate__(self):
+return self._beam_options
+
+  def __setstate__(self, options):
+self._initialize(options)
+
+  def _initialize(self, options):
+self._beam_options = options
+self.table = None
+self.sample_row_keys = None
+self.row_count = Metrics.counter(self.__class__.__name__, 'Rows read')
+
+  def start_bundle(self):
+if self.table is None:
+  self.table = Client(project=self._beam_options['project_id'])\
+.instance(self._beam_options['instance_id'])\
+.table(self._beam_options['table_id'])
+
+  def process(self, element, **kwargs):
+for row in self.table.read_rows(start_key=element.start_position,
+end_key=element.end_position,
+filter_=self._beam_options['filter_']):
+  self.row_count.inc()
+  yield row
+
+  def display_data(self):
+return {'projectId': DisplayDataItem(self._beam_options['project_id'],
+ label='Bigtable Project Id'),
+'instanceId': DisplayDataItem(self._beam_options['instance_id'],
+  label='Bigtable Instance Id'),
+'tableId': DisplayDataItem(self._beam_options['table_id'],
+   label='Bigtable Table Id'),
+'filter_': DisplayDataItem(str(self._beam_options['filter_']),
+   label='Bigtable Filter')
+   }
+
+
+class ReadFromBigTable(beam.PTransform):
+  def __init__(self, project_id, instance_id, table_id, filter_=b''):
+""" The PTransform to access the Bigtable Read connector
+
+Args:
+  project_id: [str] GCP Project of to read the Rows
+  instance_id): [str] GCP Instance to read the Rows
+  table_id): [str] GCP Table to read the Rows
+  filter_: [RowFilter] Filter to apply to columns in a row.
+"""
+super(self.__class__, self).__init__()
+self._beam_options = {'project_id': project_id,
+  'instance_id': instance_id,
+  'table_id': table_id,
+  'filter_': filter_}
+
+  def 

[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2019-09-24 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=317466&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-317466
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 24/Sep/19 13:00
Start Date: 24/Sep/19 13:00
Worklog Time Spent: 10m 
  Work Description: sduskis commented on pull request #8457: [BEAM-3342] 
Create a Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#discussion_r327600215
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigtableio.py
 ##
 @@ -122,22 +126,145 @@ class WriteToBigTable(beam.PTransform):
   A PTransform that write a list of `DirectRow` into the Bigtable Table
 
   """
-  def __init__(self, project_id=None, instance_id=None,
-   table_id=None):
+  def __init__(self, project_id=None, instance_id=None, table_id=None):
 """ The PTransform to access the Bigtable Write connector
 Args:
   project_id(str): GCP Project of to write the Rows
   instance_id(str): GCP Instance to write the Rows
   table_id(str): GCP Table to write the `DirectRows`
 """
 super(WriteToBigTable, self).__init__()
-self.beam_options = {'project_id': project_id,
- 'instance_id': instance_id,
- 'table_id': table_id}
+self._beam_options = {'project_id': project_id,
+  'instance_id': instance_id,
+  'table_id': table_id}
 
   def expand(self, pvalue):
-beam_options = self.beam_options
+beam_options = self._beam_options
 return (pvalue
 | beam.ParDo(_BigTableWriteFn(beam_options['project_id'],
   beam_options['instance_id'],
   beam_options['table_id'])))
+
+
+class _BigtableReadFn(beam.DoFn):
+  """ Creates the connector that can read rows for Beam pipeline
+
+  Args:
+project_id(str): GCP Project ID
+instance_id(str): GCP Instance ID
+table_id(str): GCP Table ID
+
+  """
+
+  def __init__(self, project_id, instance_id, table_id, filter_=b''):
 
 Review comment:
   +1
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 317466)
Time Spent: 40h  (was: 39h 50m)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Assignee: Solomon Duskis
>Priority: Major
>  Time Spent: 40h
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2019-09-24 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=317465&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-317465
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 24/Sep/19 13:00
Start Date: 24/Sep/19 13:00
Worklog Time Spent: 10m 
  Work Description: sduskis commented on pull request #8457: [BEAM-3342] 
Create a Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#discussion_r327599781
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigtableio.py
 ##
 @@ -122,22 +126,145 @@ class WriteToBigTable(beam.PTransform):
   A PTransform that write a list of `DirectRow` into the Bigtable Table
 
   """
-  def __init__(self, project_id=None, instance_id=None,
-   table_id=None):
+  def __init__(self, project_id=None, instance_id=None, table_id=None):
 """ The PTransform to access the Bigtable Write connector
 Args:
   project_id(str): GCP Project of to write the Rows
   instance_id(str): GCP Instance to write the Rows
   table_id(str): GCP Table to write the `DirectRows`
 """
 super(WriteToBigTable, self).__init__()
-self.beam_options = {'project_id': project_id,
- 'instance_id': instance_id,
- 'table_id': table_id}
+self._beam_options = {'project_id': project_id,
+  'instance_id': instance_id,
+  'table_id': table_id}
 
   def expand(self, pvalue):
-beam_options = self.beam_options
+beam_options = self._beam_options
 return (pvalue
 | beam.ParDo(_BigTableWriteFn(beam_options['project_id'],
   beam_options['instance_id'],
   beam_options['table_id'])))
+
+
+class _BigtableReadFn(beam.DoFn):
+  """ Creates the connector that can read rows for Beam pipeline
+
+  Args:
+project_id(str): GCP Project ID
+instance_id(str): GCP Instance ID
+table_id(str): GCP Table ID
+
+  """
+
+  def __init__(self, project_id, instance_id, table_id, filter_=b''):
+""" Constructor of the Read connector of Bigtable
+
+Args:
+  project_id: [str] GCP Project of to write the Rows
+  instance_id: [str] GCP Instance to write the Rows
+  table_id: [str] GCP Table to write the `DirectRows`
+  filter_: [RowFilter] Filter to apply to columns in a row.
+"""
+super(self.__class__, self).__init__()
+self._initialize({'project_id': project_id,
+  'instance_id': instance_id,
+  'table_id': table_id,
+  'filter_': filter_})
+
+  def __getstate__(self):
+return self._beam_options
+
+  def __setstate__(self, options):
+self._initialize(options)
+
+  def _initialize(self, options):
+self._beam_options = options
+self.table = None
+self.sample_row_keys = None
+self.row_count = Metrics.counter(self.__class__.__name__, 'Rows read')
+
+  def start_bundle(self):
+if self.table is None:
+  self.table = Client(project=self._beam_options['project_id'])\
+.instance(self._beam_options['instance_id'])\
+.table(self._beam_options['table_id'])
+
+  def process(self, element, **kwargs):
+for row in self.table.read_rows(start_key=element.start_position,
+end_key=element.end_position,
+filter_=self._beam_options['filter_']):
+  self.row_count.inc()
+  yield row
+
+  def display_data(self):
+return {'projectId': DisplayDataItem(self._beam_options['project_id'],
+ label='Bigtable Project Id'),
+'instanceId': DisplayDataItem(self._beam_options['instance_id'],
+  label='Bigtable Instance Id'),
+'tableId': DisplayDataItem(self._beam_options['table_id'],
+   label='Bigtable Table Id'),
+'filter_': DisplayDataItem(str(self._beam_options['filter_']),
+   label='Bigtable Filter')
+   }
+
+
+class ReadFromBigTable(beam.PTransform):
+  def __init__(self, project_id, instance_id, table_id, filter_=b''):
+""" The PTransform to access the Bigtable Read connector
+
+Args:
+  project_id: [str] GCP Project of to read the Rows
+  instance_id): [str] GCP Instance to read the Rows
+  table_id): [str] GCP Table to read the Rows
+  filter_: [RowFilter] Filter to apply to columns in a row.
+"""
+super(self.__class__, self).__init__()
+self._beam_options = {'project_id': project_id,
+  'instance_id': instance_id,
+  'table_id': table_id,
+  'filter_': filter_}
+
+  def 

[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2019-09-24 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=317464&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-317464
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 24/Sep/19 12:59
Start Date: 24/Sep/19 12:59
Worklog Time Spent: 10m 
  Work Description: sduskis commented on pull request #8457: [BEAM-3342] 
Create a Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#discussion_r327599781
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigtableio.py
 ##
 @@ -122,22 +126,145 @@ class WriteToBigTable(beam.PTransform):
   A PTransform that write a list of `DirectRow` into the Bigtable Table
 
   """
-  def __init__(self, project_id=None, instance_id=None,
-   table_id=None):
+  def __init__(self, project_id=None, instance_id=None, table_id=None):
 """ The PTransform to access the Bigtable Write connector
 Args:
   project_id(str): GCP Project of to write the Rows
   instance_id(str): GCP Instance to write the Rows
   table_id(str): GCP Table to write the `DirectRows`
 """
 super(WriteToBigTable, self).__init__()
-self.beam_options = {'project_id': project_id,
- 'instance_id': instance_id,
- 'table_id': table_id}
+self._beam_options = {'project_id': project_id,
+  'instance_id': instance_id,
+  'table_id': table_id}
 
   def expand(self, pvalue):
-beam_options = self.beam_options
+beam_options = self._beam_options
 return (pvalue
 | beam.ParDo(_BigTableWriteFn(beam_options['project_id'],
   beam_options['instance_id'],
   beam_options['table_id'])))
+
+
+class _BigtableReadFn(beam.DoFn):
+  """ Creates the connector that can read rows for Beam pipeline
+
+  Args:
+project_id(str): GCP Project ID
+instance_id(str): GCP Instance ID
+table_id(str): GCP Table ID
+
+  """
+
+  def __init__(self, project_id, instance_id, table_id, filter_=b''):
+""" Constructor of the Read connector of Bigtable
+
+Args:
+  project_id: [str] GCP Project of to write the Rows
+  instance_id: [str] GCP Instance to write the Rows
+  table_id: [str] GCP Table to write the `DirectRows`
+  filter_: [RowFilter] Filter to apply to columns in a row.
+"""
+super(self.__class__, self).__init__()
+self._initialize({'project_id': project_id,
+  'instance_id': instance_id,
+  'table_id': table_id,
+  'filter_': filter_})
+
+  def __getstate__(self):
+return self._beam_options
+
+  def __setstate__(self, options):
+self._initialize(options)
+
+  def _initialize(self, options):
+self._beam_options = options
+self.table = None
+self.sample_row_keys = None
+self.row_count = Metrics.counter(self.__class__.__name__, 'Rows read')
+
+  def start_bundle(self):
+if self.table is None:
+  self.table = Client(project=self._beam_options['project_id'])\
+.instance(self._beam_options['instance_id'])\
+.table(self._beam_options['table_id'])
+
+  def process(self, element, **kwargs):
+for row in self.table.read_rows(start_key=element.start_position,
+end_key=element.end_position,
+filter_=self._beam_options['filter_']):
+  self.row_count.inc()
+  yield row
+
+  def display_data(self):
+return {'projectId': DisplayDataItem(self._beam_options['project_id'],
+ label='Bigtable Project Id'),
+'instanceId': DisplayDataItem(self._beam_options['instance_id'],
+  label='Bigtable Instance Id'),
+'tableId': DisplayDataItem(self._beam_options['table_id'],
+   label='Bigtable Table Id'),
+'filter_': DisplayDataItem(str(self._beam_options['filter_']),
+   label='Bigtable Filter')
+   }
+
+
+class ReadFromBigTable(beam.PTransform):
+  def __init__(self, project_id, instance_id, table_id, filter_=b''):
+""" The PTransform to access the Bigtable Read connector
+
+Args:
+  project_id: [str] GCP Project of to read the Rows
+  instance_id): [str] GCP Instance to read the Rows
+  table_id): [str] GCP Table to read the Rows
+  filter_: [RowFilter] Filter to apply to columns in a row.
+"""
+super(self.__class__, self).__init__()
+self._beam_options = {'project_id': project_id,
+  'instance_id': instance_id,
+  'table_id': table_id,
+  'filter_': filter_}
+
+  def 

[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2019-09-19 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=315295&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-315295
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 19/Sep/19 20:18
Start Date: 19/Sep/19 20:18
Worklog Time Spent: 10m 
  Work Description: drubinstein commented on pull request #8457: 
[BEAM-3342] Create a Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#discussion_r326364428
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigtableio.py
 ##
 @@ -122,22 +126,145 @@ class WriteToBigTable(beam.PTransform):
   A PTransform that write a list of `DirectRow` into the Bigtable Table
 
   """
-  def __init__(self, project_id=None, instance_id=None,
-   table_id=None):
+  def __init__(self, project_id=None, instance_id=None, table_id=None):
 """ The PTransform to access the Bigtable Write connector
 Args:
   project_id(str): GCP Project of to write the Rows
   instance_id(str): GCP Instance to write the Rows
   table_id(str): GCP Table to write the `DirectRows`
 """
 super(WriteToBigTable, self).__init__()
-self.beam_options = {'project_id': project_id,
- 'instance_id': instance_id,
- 'table_id': table_id}
+self._beam_options = {'project_id': project_id,
+  'instance_id': instance_id,
+  'table_id': table_id}
 
   def expand(self, pvalue):
-beam_options = self.beam_options
+beam_options = self._beam_options
 return (pvalue
 | beam.ParDo(_BigTableWriteFn(beam_options['project_id'],
   beam_options['instance_id'],
   beam_options['table_id'])))
+
+
+class _BigtableReadFn(beam.DoFn):
+  """ Creates the connector that can read rows for Beam pipeline
+
+  Args:
+project_id(str): GCP Project ID
+instance_id(str): GCP Instance ID
+table_id(str): GCP Table ID
+
+  """
+
+  def __init__(self, project_id, instance_id, table_id, filter_=b''):
+""" Constructor of the Read connector of Bigtable
+
+Args:
+  project_id: [str] GCP Project of to write the Rows
+  instance_id: [str] GCP Instance to write the Rows
+  table_id: [str] GCP Table to write the `DirectRows`
+  filter_: [RowFilter] Filter to apply to columns in a row.
+"""
+super(self.__class__, self).__init__()
+self._initialize({'project_id': project_id,
+  'instance_id': instance_id,
+  'table_id': table_id,
+  'filter_': filter_})
+
+  def __getstate__(self):
+return self._beam_options
+
+  def __setstate__(self, options):
+self._initialize(options)
+
+  def _initialize(self, options):
+self._beam_options = options
+self.table = None
+self.sample_row_keys = None
+self.row_count = Metrics.counter(self.__class__.__name__, 'Rows read')
+
+  def start_bundle(self):
+if self.table is None:
+  self.table = Client(project=self._beam_options['project_id'])\
+.instance(self._beam_options['instance_id'])\
+.table(self._beam_options['table_id'])
+
+  def process(self, element, **kwargs):
+for row in self.table.read_rows(start_key=element.start_position,
+end_key=element.end_position,
+filter_=self._beam_options['filter_']):
+  self.row_count.inc()
+  yield row
+
+  def display_data(self):
+return {'projectId': DisplayDataItem(self._beam_options['project_id'],
+ label='Bigtable Project Id'),
+'instanceId': DisplayDataItem(self._beam_options['instance_id'],
+  label='Bigtable Instance Id'),
+'tableId': DisplayDataItem(self._beam_options['table_id'],
+   label='Bigtable Table Id'),
+'filter_': DisplayDataItem(str(self._beam_options['filter_']),
+   label='Bigtable Filter')
+   }
+
+
+class ReadFromBigTable(beam.PTransform):
+  def __init__(self, project_id, instance_id, table_id, filter_=b''):
+""" The PTransform to access the Bigtable Read connector
+
+Args:
+  project_id: [str] GCP Project of to read the Rows
+  instance_id): [str] GCP Instance to read the Rows
+  table_id): [str] GCP Table to read the Rows
+  filter_: [RowFilter] Filter to apply to columns in a row.
+"""
+super(self.__class__, self).__init__()
+self._beam_options = {'project_id': project_id,
+  'instance_id': instance_id,
+  'table_id': table_id,
+  'filter_': filter_}
+
+  

[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2019-09-19 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=315294&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-315294
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 19/Sep/19 20:17
Start Date: 19/Sep/19 20:17
Worklog Time Spent: 10m 
  Work Description: drubinstein commented on pull request #8457: 
[BEAM-3342] Create a Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#discussion_r326364807
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigtableio.py
 ##
 @@ -122,22 +126,145 @@ class WriteToBigTable(beam.PTransform):
   A PTransform that write a list of `DirectRow` into the Bigtable Table
 
   """
-  def __init__(self, project_id=None, instance_id=None,
-   table_id=None):
+  def __init__(self, project_id=None, instance_id=None, table_id=None):
 """ The PTransform to access the Bigtable Write connector
 Args:
   project_id(str): GCP Project of to write the Rows
   instance_id(str): GCP Instance to write the Rows
   table_id(str): GCP Table to write the `DirectRows`
 """
 super(WriteToBigTable, self).__init__()
-self.beam_options = {'project_id': project_id,
- 'instance_id': instance_id,
- 'table_id': table_id}
+self._beam_options = {'project_id': project_id,
+  'instance_id': instance_id,
+  'table_id': table_id}
 
   def expand(self, pvalue):
-beam_options = self.beam_options
+beam_options = self._beam_options
 return (pvalue
 | beam.ParDo(_BigTableWriteFn(beam_options['project_id'],
   beam_options['instance_id'],
   beam_options['table_id'])))
+
+
+class _BigtableReadFn(beam.DoFn):
+  """ Creates the connector that can read rows for Beam pipeline
+
+  Args:
+project_id(str): GCP Project ID
+instance_id(str): GCP Instance ID
+table_id(str): GCP Table ID
+
+  """
+
+  def __init__(self, project_id, instance_id, table_id, filter_=b''):
 
 Review comment:
   `filter_` should probably default to `None` since that is the default for 
`read_rows` if you pass in an empty bytestring like you do here, then you 
receive an error cause they do not have a `to_pb` function as expected of 
`RowFilter` type objects.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 315294)
Time Spent: 39h 20m  (was: 39h 10m)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Assignee: Solomon Duskis
>Priority: Major
>  Time Spent: 39h 20m
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2019-09-19 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=315293&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-315293
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 19/Sep/19 20:16
Start Date: 19/Sep/19 20:16
Worklog Time Spent: 10m 
  Work Description: drubinstein commented on pull request #8457: 
[BEAM-3342] Create a Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#discussion_r326364428
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigtableio.py
 ##
 @@ -122,22 +126,145 @@ class WriteToBigTable(beam.PTransform):
   A PTransform that write a list of `DirectRow` into the Bigtable Table
 
   """
-  def __init__(self, project_id=None, instance_id=None,
-   table_id=None):
+  def __init__(self, project_id=None, instance_id=None, table_id=None):
 """ The PTransform to access the Bigtable Write connector
 Args:
   project_id(str): GCP Project of to write the Rows
   instance_id(str): GCP Instance to write the Rows
   table_id(str): GCP Table to write the `DirectRows`
 """
 super(WriteToBigTable, self).__init__()
-self.beam_options = {'project_id': project_id,
- 'instance_id': instance_id,
- 'table_id': table_id}
+self._beam_options = {'project_id': project_id,
+  'instance_id': instance_id,
+  'table_id': table_id}
 
   def expand(self, pvalue):
-beam_options = self.beam_options
+beam_options = self._beam_options
 return (pvalue
 | beam.ParDo(_BigTableWriteFn(beam_options['project_id'],
   beam_options['instance_id'],
   beam_options['table_id'])))
+
+
+class _BigtableReadFn(beam.DoFn):
+  """ Creates the connector that can read rows for Beam pipeline
+
+  Args:
+project_id(str): GCP Project ID
+instance_id(str): GCP Instance ID
+table_id(str): GCP Table ID
+
+  """
+
+  def __init__(self, project_id, instance_id, table_id, filter_=b''):
+""" Constructor of the Read connector of Bigtable
+
+Args:
+  project_id: [str] GCP Project of to write the Rows
+  instance_id: [str] GCP Instance to write the Rows
+  table_id: [str] GCP Table to write the `DirectRows`
+  filter_: [RowFilter] Filter to apply to columns in a row.
+"""
+super(self.__class__, self).__init__()
+self._initialize({'project_id': project_id,
+  'instance_id': instance_id,
+  'table_id': table_id,
+  'filter_': filter_})
+
+  def __getstate__(self):
+return self._beam_options
+
+  def __setstate__(self, options):
+self._initialize(options)
+
+  def _initialize(self, options):
+self._beam_options = options
+self.table = None
+self.sample_row_keys = None
+self.row_count = Metrics.counter(self.__class__.__name__, 'Rows read')
+
+  def start_bundle(self):
+if self.table is None:
+  self.table = Client(project=self._beam_options['project_id'])\
+.instance(self._beam_options['instance_id'])\
+.table(self._beam_options['table_id'])
+
+  def process(self, element, **kwargs):
+for row in self.table.read_rows(start_key=element.start_position,
+end_key=element.end_position,
+filter_=self._beam_options['filter_']):
+  self.row_count.inc()
+  yield row
+
+  def display_data(self):
+return {'projectId': DisplayDataItem(self._beam_options['project_id'],
+ label='Bigtable Project Id'),
+'instanceId': DisplayDataItem(self._beam_options['instance_id'],
+  label='Bigtable Instance Id'),
+'tableId': DisplayDataItem(self._beam_options['table_id'],
+   label='Bigtable Table Id'),
+'filter_': DisplayDataItem(str(self._beam_options['filter_']),
+   label='Bigtable Filter')
+   }
+
+
+class ReadFromBigTable(beam.PTransform):
+  def __init__(self, project_id, instance_id, table_id, filter_=b''):
+""" The PTransform to access the Bigtable Read connector
+
+Args:
+  project_id: [str] GCP Project of to read the Rows
+  instance_id): [str] GCP Instance to read the Rows
+  table_id): [str] GCP Table to read the Rows
+  filter_: [RowFilter] Filter to apply to columns in a row.
+"""
+super(self.__class__, self).__init__()
+self._beam_options = {'project_id': project_id,
+  'instance_id': instance_id,
+  'table_id': table_id,
+  'filter_': filter_}
+
+  

[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2019-09-19 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=315236&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-315236
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 19/Sep/19 18:49
Start Date: 19/Sep/19 18:49
Worklog Time Spent: 10m 
  Work Description: mf2199 commented on issue #8457: [BEAM-3342] Create a 
Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#issuecomment-533260992
 
 
   During the latest series of tests using the Dataflow, the following error 
appears:
   
   ```
   ...
   INFO:root:2019-09-19T12:19:07.484Z: JOB_MESSAGE_DETAILED: Autoscaling: 
Raised the number of workers to 10 based on the rate of progress in the 
currently running step(s).
   INFO:root:2019-09-19T12:20:15.500Z: JOB_MESSAGE_DETAILED: Workers have 
started successfully.
   INFO:root:2019-09-19T12:20:15.528Z: JOB_MESSAGE_DETAILED: Workers have 
started successfully.
   INFO:root:2019-09-19T12:24:39.237Z: JOB_MESSAGE_DETAILED: Checking 
permissions granted to controller Service Account.
   INFO:root:2019-09-19T12:30:39.238Z: JOB_MESSAGE_DETAILED: Checking 
permissions granted to controller Service Account.
   INFO:root:2019-09-19T12:32:51.436Z: JOB_MESSAGE_BASIC: Finished operation 
Bigtable Read/Impulse+Bigtable Read/Split+Bigtable 
Read/Reshuffle/AddRandomKeys+Bigtable 
Read/Reshuffle/ReshufflePerKey/Map(reify_timestamps)+Bigtable 
Read/Reshuffle/ReshufflePerKey/GroupByKe
   y/Reify+Bigtable Read/Reshuffle/ReshufflePerKey/GroupByKey/Write
   INFO:root:2019-09-19T12:32:51.616Z: JOB_MESSAGE_DEBUG: Executing failure 
step failure32
   INFO:root:2019-09-19T12:32:51.677Z: JOB_MESSAGE_ERROR: Workflow failed. 
Causes: S03:Bigtable Read/Impulse+Bigtable Read/Split+Bigtable 
Read/Reshuffle/AddRandomKeys+Bigtable 
Read/Reshuffle/ReshufflePerKey/Map(reify_timestamps)+Bigtable 
Read/Reshuffle/ReshufflePerKey
   /GroupByKey/Reify+Bigtable Read/Reshuffle/ReshufflePerKey/GroupByKey/Write 
failed., The job failed because a work item has failed 4 times. Look in 
previous log entries for the cause of each one of the 4 failures. For more 
information, see https://cloud.google.com/d
   ataflow/docs/guides/common-errors. The work item was attempted on these 
workers:
 bigtableio-it-test-10k-20-09190518-70i4-harness-h8n9
 Root cause: The worker lost contact with the service.,
 bigtableio-it-test-10k-20-09190518-70i4-harness-h8n9
 Root cause: The worker lost contact with the service.,
 bigtableio-it-test-10k-20-09190518-70i4-harness-h8n9
 Root cause: The worker lost contact with the service.,
 bigtableio-it-test-10k-20-09190518-70i4-harness-h8n9
 Root cause: The worker lost contact with the service.
   INFO:root:2019-09-19T12:32:51.797Z: JOB_MESSAGE_DETAILED: Cleaning up.
   INFO:root:2019-09-19T12:32:51.877Z: JOB_MESSAGE_DEBUG: Starting worker pool 
teardown.
   INFO:root:2019-09-19T12:32:51.903Z: JOB_MESSAGE_BASIC: Stopping worker 
pool...
   INFO:root:2019-09-19T12:37:45.412Z: JOB_MESSAGE_DETAILED: Autoscaling: 
Reduced the number of workers to 0 based on the rate of progress in the 
currently running step(s).
   INFO:root:2019-09-19T12:37:45.506Z: JOB_MESSAGE_BASIC: Worker pool stopped.
   INFO:root:2019-09-19T12:37:45.544Z: JOB_MESSAGE_DEBUG: Tearing down pending 
resources...
   INFO:root:Job 2019-09-19_05_18_33-1351245893518858257 is in state 
JOB_STATE_FAILED
   ERROR
   ...
   ```
   
   The code used in this test is essentially the same code that ran 
successfully a couple of months ago. What was changed since then is the test 
code of `bigtableio_it_test.py` file that was refactored to eliminate redundant 
steps in parsing the pipeline parameters from the command line arguments. The 
parameters themselves were not changed and, based on the log output, are 
properly set within the `beam.Pipeline` object that runs the pipeline during 
the test. Based on the message timings, it seems like the pipeline is able to 
start the workers, after which everything hangs. The "Checking permissions..." 
message could suggest an authorization issue, but the same type of the pipeline 
with the same parameters (except for the job name) is perfectly able to do the 
write sequence during the 'write' part of the same test.
   
   Reverting the code to the very first, initial version of the current PR, 
that had been tested several times prior to its submission and shown to work, 
now throws a pickling error:
   
   ```
   ...
   ==
   ERROR: test_bigtable_io (__main__.BigtableIOTest)
   --
   Traceback (most recent call last):
 File 
"c:\git\beam-MF\sdks\python\apache_beam\io\gcp\bigtableio_it_test.py", line 
120, in test_bigtable_io
   self.result.wait_until_finis

[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2019-09-19 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=315193&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-315193
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 19/Sep/19 17:01
Start Date: 19/Sep/19 17:01
Worklog Time Spent: 10m 
  Work Description: pabloem commented on issue #8457: [BEAM-3342] Create a 
Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#issuecomment-533221385
 
 
   Run Python PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 315193)
Time Spent: 38h 50m  (was: 38h 40m)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Assignee: Solomon Duskis
>Priority: Major
>  Time Spent: 38h 50m
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2019-09-16 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=313233&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-313233
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 16/Sep/19 18:17
Start Date: 16/Sep/19 18:17
Worklog Time Spent: 10m 
  Work Description: chamikaramj commented on issue #8457: [BEAM-3342] 
Create a Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#issuecomment-531895429
 
 
   Run Python PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 313233)
Time Spent: 38h 40m  (was: 38.5h)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Assignee: Solomon Duskis
>Priority: Major
>  Time Spent: 38h 40m
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2019-09-16 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=313232&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-313232
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 16/Sep/19 18:17
Start Date: 16/Sep/19 18:17
Worklog Time Spent: 10m 
  Work Description: chamikaramj commented on issue #8457: [BEAM-3342] 
Create a Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#issuecomment-531895338
 
 
   Run Python PostCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 313232)
Time Spent: 38.5h  (was: 38h 20m)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Assignee: Solomon Duskis
>Priority: Major
>  Time Spent: 38.5h
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2019-09-12 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=311853&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-311853
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 13/Sep/19 02:56
Start Date: 13/Sep/19 02:56
Worklog Time Spent: 10m 
  Work Description: mf2199 commented on pull request #8457: [BEAM-3342] 
Create a Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#discussion_r324020715
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigtableio_it_test.py
 ##
 @@ -0,0 +1,189 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+""" Integration test for GCP Bigtable testing."""
+from __future__ import absolute_import
+
+import argparse
+import datetime
+import logging
+import random
+import string
+import time
+import unittest
+from nose.plugins.attrib import attr
+
+import apache_beam as beam
+from apache_beam.metrics.metric import MetricsFilter
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.runners.runner import PipelineState
+from apache_beam.testing.util import assert_that, equal_to
+from apache_beam.transforms.combiners import Count
+
+try:
+  from google.cloud.bigtable import enums, row, column_family, Client
+except ImportError:
+  Client = None
+
+import bigtableio
+
+class GenerateTestRows(beam.PTransform):
+  """ A PTransform to generate dummy rows to write to a Bigtable Table.
+
+  A PTransform that generates a list of `DirectRow` and writes it to a 
Bigtable Table.
+  """
+  def __init__(self):
+super(self.__class__, self).__init__()
+self.beam_options = {'project_id': PROJECT_ID,
+ 'instance_id': INSTANCE_ID,
+ 'table_id': TABLE_ID}
+
+  def _generate(self):
+for i in range(ROW_COUNT):
+  key = "key_%s" % ('{0:012}'.format(i))
+  test_row = row.DirectRow(row_key=key)
+  value = ''.join(random.choice(LETTERS_AND_DIGITS) for _ in 
range(CELL_SIZE))
+  for j in range(COLUMN_COUNT):
+test_row.set_cell(column_family_id=COLUMN_FAMILY_ID,
+  column=('field%s' % j).encode('utf-8'),
+  value=value,
+  timestamp=datetime.datetime.now())
+  yield test_row
+
+  def expand(self, pvalue):
+return (pvalue
+| beam.Create(self._generate())
+| 
bigtableio.WriteToBigTable(project_id=self.beam_options['project_id'],
+ 
instance_id=self.beam_options['instance_id'],
+ 
table_id=self.beam_options['table_id']))
+
+@unittest.skipIf(Client is None, 'GCP Bigtable dependencies are not installed')
+class BigtableIOTest(unittest.TestCase):
+  """ Bigtable IO Connector Test
+
+  This tests the connector both ways, first writing rows to a new table, then 
reading them and comparing the counters
+  """
+  def setUp(self):
+self.result = None
+self.table = Client(project=PROJECT_ID, admin=True)\
+.instance(instance_id=INSTANCE_ID)\
+.table(TABLE_ID)
+
+if not self.table.exists():
+  column_families = {COLUMN_FAMILY_ID: column_family.MaxVersionsGCRule(2)}
+  self.table.create(column_families=column_families)
+  logging.info('Table {} has been created!'.format(TABLE_ID))
+
+  @attr('IT')
+  def test_bigtable_io(self):
+print 'Project ID: ', PROJECT_ID
+print 'Instance ID:', INSTANCE_ID
+print 'Table ID:   ', TABLE_ID
 
 Review comment:
   Changed to `logging.info(...)`
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 311853)
Time Spent: 38h 20m  (was: 38h 10m)

> Create a Cloud Bigtable IO connector for Python
> ---
>

[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2019-09-12 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=311851&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-311851
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 13/Sep/19 02:38
Start Date: 13/Sep/19 02:38
Worklog Time Spent: 10m 
  Work Description: mf2199 commented on pull request #8457: [BEAM-3342] 
Create a Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#discussion_r324018337
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigtableio_it_test.py
 ##
 @@ -0,0 +1,189 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+""" Integration test for GCP Bigtable testing."""
+from __future__ import absolute_import
+
+import argparse
+import datetime
+import logging
+import random
+import string
+import time
+import unittest
+from nose.plugins.attrib import attr
+
+import apache_beam as beam
+from apache_beam.metrics.metric import MetricsFilter
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.runners.runner import PipelineState
+from apache_beam.testing.util import assert_that, equal_to
+from apache_beam.transforms.combiners import Count
+
+try:
+  from google.cloud.bigtable import enums, row, column_family, Client
+except ImportError:
+  Client = None
+
+import bigtableio
 
 Review comment:
   Fixed.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 311851)
Time Spent: 38h 10m  (was: 38h)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Assignee: Solomon Duskis
>Priority: Major
>  Time Spent: 38h 10m
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2019-09-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=308032&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-308032
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 06/Sep/19 17:24
Start Date: 06/Sep/19 17:24
Worklog Time Spent: 10m 
  Work Description: sduskis commented on pull request #8457: [BEAM-3342] 
Create a Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#discussion_r321834329
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigtableio.py
 ##
 @@ -122,22 +126,145 @@ class WriteToBigTable(beam.PTransform):
   A PTransform that write a list of `DirectRow` into the Bigtable Table
 
   """
-  def __init__(self, project_id=None, instance_id=None,
-   table_id=None):
+  def __init__(self, project_id=None, instance_id=None, table_id=None):
 """ The PTransform to access the Bigtable Write connector
 Args:
   project_id(str): GCP Project of to write the Rows
   instance_id(str): GCP Instance to write the Rows
   table_id(str): GCP Table to write the `DirectRows`
 """
 super(WriteToBigTable, self).__init__()
-self.beam_options = {'project_id': project_id,
- 'instance_id': instance_id,
- 'table_id': table_id}
+self._beam_options = {'project_id': project_id,
+  'instance_id': instance_id,
+  'table_id': table_id}
 
   def expand(self, pvalue):
-beam_options = self.beam_options
+beam_options = self._beam_options
 return (pvalue
 | beam.ParDo(_BigTableWriteFn(beam_options['project_id'],
   beam_options['instance_id'],
   beam_options['table_id'])))
+
+
+class _BigtableReadFn(beam.DoFn):
+  """ Creates the connector that can read rows for Beam pipeline
+
+  Args:
+project_id(str): GCP Project ID
+instance_id(str): GCP Instance ID
+table_id(str): GCP Table ID
+
+  """
+
+  def __init__(self, project_id, instance_id, table_id, filter_=b''):
+""" Constructor of the Read connector of Bigtable
+
+Args:
+  project_id: [str] GCP Project of to write the Rows
+  instance_id: [str] GCP Instance to write the Rows
+  table_id: [str] GCP Table to write the `DirectRows`
+  filter_: [RowFilter] Filter to apply to columns in a row.
+"""
+super(self.__class__, self).__init__()
+self._initialize({'project_id': project_id,
+  'instance_id': instance_id,
+  'table_id': table_id,
+  'filter_': filter_})
+
+  def __getstate__(self):
+return self._beam_options
+
+  def __setstate__(self, options):
+self._initialize(options)
+
+  def _initialize(self, options):
+self._beam_options = options
+self.table = None
+self.sample_row_keys = None
+self.row_count = Metrics.counter(self.__class__.__name__, 'Rows read')
+
+  def start_bundle(self):
+if self.table is None:
+  self.table = Client(project=self._beam_options['project_id'])\
+.instance(self._beam_options['instance_id'])\
+.table(self._beam_options['table_id'])
+
+  def process(self, element, **kwargs):
+for row in self.table.read_rows(start_key=element.start_position,
+end_key=element.end_position,
+filter_=self._beam_options['filter_']):
+  self.row_count.inc()
+  yield row
+
+  def display_data(self):
+return {'projectId': DisplayDataItem(self._beam_options['project_id'],
+ label='Bigtable Project Id'),
+'instanceId': DisplayDataItem(self._beam_options['instance_id'],
+  label='Bigtable Instance Id'),
+'tableId': DisplayDataItem(self._beam_options['table_id'],
+   label='Bigtable Table Id'),
+'filter_': DisplayDataItem(str(self._beam_options['filter_']),
+   label='Bigtable Filter')
+   }
+
+
+class ReadFromBigTable(beam.PTransform):
+  def __init__(self, project_id, instance_id, table_id, filter_=b''):
+""" The PTransform to access the Bigtable Read connector
+
+Args:
+  project_id: [str] GCP Project of to read the Rows
+  instance_id): [str] GCP Instance to read the Rows
+  table_id): [str] GCP Table to read the Rows
+  filter_: [RowFilter] Filter to apply to columns in a row.
+"""
+super(self.__class__, self).__init__()
+self._beam_options = {'project_id': project_id,
+  'instance_id': instance_id,
+  'table_id': table_id,
+  'filter_': filter_}
+
+  def 

[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2019-09-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=308029&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-308029
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 06/Sep/19 17:22
Start Date: 06/Sep/19 17:22
Worklog Time Spent: 10m 
  Work Description: sduskis commented on pull request #8457: [BEAM-3342] 
Create a Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#discussion_r321833569
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigtableio.py
 ##
 @@ -122,22 +126,145 @@ class WriteToBigTable(beam.PTransform):
   A PTransform that write a list of `DirectRow` into the Bigtable Table
 
   """
-  def __init__(self, project_id=None, instance_id=None,
-   table_id=None):
+  def __init__(self, project_id=None, instance_id=None, table_id=None):
 """ The PTransform to access the Bigtable Write connector
 Args:
   project_id(str): GCP Project of to write the Rows
   instance_id(str): GCP Instance to write the Rows
   table_id(str): GCP Table to write the `DirectRows`
 """
 super(WriteToBigTable, self).__init__()
-self.beam_options = {'project_id': project_id,
- 'instance_id': instance_id,
- 'table_id': table_id}
+self._beam_options = {'project_id': project_id,
+  'instance_id': instance_id,
+  'table_id': table_id}
 
   def expand(self, pvalue):
-beam_options = self.beam_options
+beam_options = self._beam_options
 return (pvalue
 | beam.ParDo(_BigTableWriteFn(beam_options['project_id'],
   beam_options['instance_id'],
   beam_options['table_id'])))
+
+
+class _BigtableReadFn(beam.DoFn):
+  """ Creates the connector that can read rows for Beam pipeline
+
+  Args:
+project_id(str): GCP Project ID
+instance_id(str): GCP Instance ID
+table_id(str): GCP Table ID
+
+  """
+
+  def __init__(self, project_id, instance_id, table_id, filter_=b''):
+""" Constructor of the Read connector of Bigtable
+
+Args:
+  project_id: [str] GCP Project of to write the Rows
+  instance_id: [str] GCP Instance to write the Rows
+  table_id: [str] GCP Table to write the `DirectRows`
+  filter_: [RowFilter] Filter to apply to columns in a row.
+"""
+super(self.__class__, self).__init__()
+self._initialize({'project_id': project_id,
+  'instance_id': instance_id,
+  'table_id': table_id,
+  'filter_': filter_})
+
+  def __getstate__(self):
+return self._beam_options
+
+  def __setstate__(self, options):
+self._initialize(options)
+
+  def _initialize(self, options):
+self._beam_options = options
+self.table = None
+self.sample_row_keys = None
+self.row_count = Metrics.counter(self.__class__.__name__, 'Rows read')
+
+  def start_bundle(self):
+if self.table is None:
+  self.table = Client(project=self._beam_options['project_id'])\
+.instance(self._beam_options['instance_id'])\
+.table(self._beam_options['table_id'])
+
+  def process(self, element, **kwargs):
+for row in self.table.read_rows(start_key=element.start_position,
+end_key=element.end_position,
+filter_=self._beam_options['filter_']):
+  self.row_count.inc()
+  yield row
+
+  def display_data(self):
+return {'projectId': DisplayDataItem(self._beam_options['project_id'],
+ label='Bigtable Project Id'),
+'instanceId': DisplayDataItem(self._beam_options['instance_id'],
+  label='Bigtable Instance Id'),
+'tableId': DisplayDataItem(self._beam_options['table_id'],
+   label='Bigtable Table Id'),
+'filter_': DisplayDataItem(str(self._beam_options['filter_']),
+   label='Bigtable Filter')
+   }
+
+
+class ReadFromBigTable(beam.PTransform):
+  def __init__(self, project_id, instance_id, table_id, filter_=b''):
+""" The PTransform to access the Bigtable Read connector
+
+Args:
+  project_id: [str] GCP Project of to read the Rows
+  instance_id): [str] GCP Instance to read the Rows
+  table_id): [str] GCP Table to read the Rows
+  filter_: [RowFilter] Filter to apply to columns in a row.
+"""
+super(self.__class__, self).__init__()
+self._beam_options = {'project_id': project_id,
+  'instance_id': instance_id,
+  'table_id': table_id,
+  'filter_': filter_}
+
+  def 

[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2019-09-03 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=305960&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-305960
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 03/Sep/19 23:06
Start Date: 03/Sep/19 23:06
Worklog Time Spent: 10m 
  Work Description: mf2199 commented on pull request #8457: [BEAM-3342] 
Create a Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#discussion_r320513876
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigtableio.py
 ##
 @@ -122,22 +126,145 @@ class WriteToBigTable(beam.PTransform):
   A PTransform that write a list of `DirectRow` into the Bigtable Table
 
   """
-  def __init__(self, project_id=None, instance_id=None,
-   table_id=None):
+  def __init__(self, project_id=None, instance_id=None, table_id=None):
 """ The PTransform to access the Bigtable Write connector
 Args:
   project_id(str): GCP Project of to write the Rows
   instance_id(str): GCP Instance to write the Rows
   table_id(str): GCP Table to write the `DirectRows`
 """
 super(WriteToBigTable, self).__init__()
-self.beam_options = {'project_id': project_id,
- 'instance_id': instance_id,
- 'table_id': table_id}
+self._beam_options = {'project_id': project_id,
+  'instance_id': instance_id,
+  'table_id': table_id}
 
   def expand(self, pvalue):
-beam_options = self.beam_options
+beam_options = self._beam_options
 return (pvalue
 | beam.ParDo(_BigTableWriteFn(beam_options['project_id'],
   beam_options['instance_id'],
   beam_options['table_id'])))
+
+
+class _BigtableReadFn(beam.DoFn):
+  """ Creates the connector that can read rows for Beam pipeline
+
+  Args:
+project_id(str): GCP Project ID
+instance_id(str): GCP Instance ID
+table_id(str): GCP Table ID
+
+  """
+
+  def __init__(self, project_id, instance_id, table_id, filter_=b''):
+""" Constructor of the Read connector of Bigtable
+
+Args:
+  project_id: [str] GCP Project of to write the Rows
+  instance_id: [str] GCP Instance to write the Rows
+  table_id: [str] GCP Table to write the `DirectRows`
+  filter_: [RowFilter] Filter to apply to columns in a row.
+"""
+super(self.__class__, self).__init__()
+self._initialize({'project_id': project_id,
+  'instance_id': instance_id,
+  'table_id': table_id,
+  'filter_': filter_})
+
+  def __getstate__(self):
+return self._beam_options
+
+  def __setstate__(self, options):
+self._initialize(options)
+
+  def _initialize(self, options):
+self._beam_options = options
+self.table = None
+self.sample_row_keys = None
+self.row_count = Metrics.counter(self.__class__.__name__, 'Rows read')
+
+  def start_bundle(self):
+if self.table is None:
+  self.table = Client(project=self._beam_options['project_id'])\
+.instance(self._beam_options['instance_id'])\
+.table(self._beam_options['table_id'])
+
+  def process(self, element, **kwargs):
+for row in self.table.read_rows(start_key=element.start_position,
+end_key=element.end_position,
+filter_=self._beam_options['filter_']):
+  self.row_count.inc()
+  yield row
+
+  def display_data(self):
+return {'projectId': DisplayDataItem(self._beam_options['project_id'],
+ label='Bigtable Project Id'),
+'instanceId': DisplayDataItem(self._beam_options['instance_id'],
+  label='Bigtable Instance Id'),
+'tableId': DisplayDataItem(self._beam_options['table_id'],
+   label='Bigtable Table Id'),
+'filter_': DisplayDataItem(str(self._beam_options['filter_']),
+   label='Bigtable Filter')
+   }
+
+
+class ReadFromBigTable(beam.PTransform):
+  def __init__(self, project_id, instance_id, table_id, filter_=b''):
+""" The PTransform to access the Bigtable Read connector
+
+Args:
+  project_id: [str] GCP Project of to read the Rows
+  instance_id): [str] GCP Instance to read the Rows
+  table_id): [str] GCP Table to read the Rows
+  filter_: [RowFilter] Filter to apply to columns in a row.
+"""
+super(self.__class__, self).__init__()
+self._beam_options = {'project_id': project_id,
+  'instance_id': instance_id,
+  'table_id': table_id,
+  'filter_': filter_}
+
+  def _

[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2019-09-03 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=305785&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-305785
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 03/Sep/19 18:52
Start Date: 03/Sep/19 18:52
Worklog Time Spent: 10m 
  Work Description: chamikaramj commented on issue #8457: [BEAM-3342] 
Create a Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#issuecomment-527590606
 
 
   Any updates ?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 305785)
Time Spent: 37.5h  (was: 37h 20m)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Assignee: Solomon Duskis
>Priority: Major
>  Time Spent: 37.5h
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2019-08-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=302155&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-302155
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 27/Aug/19 16:10
Start Date: 27/Aug/19 16:10
Worklog Time Spent: 10m 
  Work Description: galic1987 commented on pull request #8457: [BEAM-3342] 
Create a Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#discussion_r318142770
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigtableio.py
 ##
 @@ -122,22 +126,145 @@ class WriteToBigTable(beam.PTransform):
   A PTransform that write a list of `DirectRow` into the Bigtable Table
 
   """
-  def __init__(self, project_id=None, instance_id=None,
-   table_id=None):
+  def __init__(self, project_id=None, instance_id=None, table_id=None):
 """ The PTransform to access the Bigtable Write connector
 Args:
   project_id(str): GCP Project of to write the Rows
   instance_id(str): GCP Instance to write the Rows
   table_id(str): GCP Table to write the `DirectRows`
 """
 super(WriteToBigTable, self).__init__()
-self.beam_options = {'project_id': project_id,
- 'instance_id': instance_id,
- 'table_id': table_id}
+self._beam_options = {'project_id': project_id,
+  'instance_id': instance_id,
+  'table_id': table_id}
 
   def expand(self, pvalue):
-beam_options = self.beam_options
+beam_options = self._beam_options
 return (pvalue
 | beam.ParDo(_BigTableWriteFn(beam_options['project_id'],
   beam_options['instance_id'],
   beam_options['table_id'])))
+
+
+class _BigtableReadFn(beam.DoFn):
+  """ Creates the connector that can read rows for Beam pipeline
+
+  Args:
+project_id(str): GCP Project ID
+instance_id(str): GCP Instance ID
+table_id(str): GCP Table ID
+
+  """
+
+  def __init__(self, project_id, instance_id, table_id, filter_=b''):
+""" Constructor of the Read connector of Bigtable
+
+Args:
+  project_id: [str] GCP Project of to write the Rows
+  instance_id: [str] GCP Instance to write the Rows
+  table_id: [str] GCP Table to write the `DirectRows`
+  filter_: [RowFilter] Filter to apply to columns in a row.
+"""
+super(self.__class__, self).__init__()
+self._initialize({'project_id': project_id,
+  'instance_id': instance_id,
+  'table_id': table_id,
+  'filter_': filter_})
+
+  def __getstate__(self):
+return self._beam_options
+
+  def __setstate__(self, options):
+self._initialize(options)
+
+  def _initialize(self, options):
+self._beam_options = options
+self.table = None
+self.sample_row_keys = None
+self.row_count = Metrics.counter(self.__class__.__name__, 'Rows read')
+
+  def start_bundle(self):
+if self.table is None:
+  self.table = Client(project=self._beam_options['project_id'])\
+.instance(self._beam_options['instance_id'])\
+.table(self._beam_options['table_id'])
+
+  def process(self, element, **kwargs):
+for row in self.table.read_rows(start_key=element.start_position,
+end_key=element.end_position,
+filter_=self._beam_options['filter_']):
+  self.row_count.inc()
+  yield row
+
+  def display_data(self):
+return {'projectId': DisplayDataItem(self._beam_options['project_id'],
+ label='Bigtable Project Id'),
+'instanceId': DisplayDataItem(self._beam_options['instance_id'],
+  label='Bigtable Instance Id'),
+'tableId': DisplayDataItem(self._beam_options['table_id'],
+   label='Bigtable Table Id'),
+'filter_': DisplayDataItem(str(self._beam_options['filter_']),
+   label='Bigtable Filter')
+   }
+
+
+class ReadFromBigTable(beam.PTransform):
+  def __init__(self, project_id, instance_id, table_id, filter_=b''):
+""" The PTransform to access the Bigtable Read connector
+
+Args:
+  project_id: [str] GCP Project of to read the Rows
+  instance_id): [str] GCP Instance to read the Rows
+  table_id): [str] GCP Table to read the Rows
+  filter_: [RowFilter] Filter to apply to columns in a row.
+"""
+super(self.__class__, self).__init__()
+self._beam_options = {'project_id': project_id,
+  'instance_id': instance_id,
+  'table_id': table_id,
+  'filter_': filter_}
+
+  de

[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2019-08-23 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=300363&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-300363
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 23/Aug/19 16:26
Start Date: 23/Aug/19 16:26
Worklog Time Spent: 10m 
  Work Description: pabloem commented on pull request #8457: [BEAM-3342] 
Create a Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#discussion_r317208505
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigtableio.py
 ##
 @@ -122,22 +126,145 @@ class WriteToBigTable(beam.PTransform):
   A PTransform that write a list of `DirectRow` into the Bigtable Table
 
   """
-  def __init__(self, project_id=None, instance_id=None,
-   table_id=None):
+  def __init__(self, project_id=None, instance_id=None, table_id=None):
 """ The PTransform to access the Bigtable Write connector
 Args:
   project_id(str): GCP Project of to write the Rows
   instance_id(str): GCP Instance to write the Rows
   table_id(str): GCP Table to write the `DirectRows`
 """
 super(WriteToBigTable, self).__init__()
-self.beam_options = {'project_id': project_id,
- 'instance_id': instance_id,
- 'table_id': table_id}
+self._beam_options = {'project_id': project_id,
+  'instance_id': instance_id,
+  'table_id': table_id}
 
   def expand(self, pvalue):
-beam_options = self.beam_options
+beam_options = self._beam_options
 return (pvalue
 | beam.ParDo(_BigTableWriteFn(beam_options['project_id'],
   beam_options['instance_id'],
   beam_options['table_id'])))
+
+
+class _BigtableReadFn(beam.DoFn):
+  """ Creates the connector that can read rows for Beam pipeline
+
+  Args:
+project_id(str): GCP Project ID
+instance_id(str): GCP Instance ID
+table_id(str): GCP Table ID
+
+  """
+
+  def __init__(self, project_id, instance_id, table_id, filter_=b''):
+""" Constructor of the Read connector of Bigtable
+
+Args:
+  project_id: [str] GCP Project of to write the Rows
+  instance_id: [str] GCP Instance to write the Rows
+  table_id: [str] GCP Table to write the `DirectRows`
+  filter_: [RowFilter] Filter to apply to columns in a row.
+"""
+super(self.__class__, self).__init__()
+self._initialize({'project_id': project_id,
+  'instance_id': instance_id,
+  'table_id': table_id,
+  'filter_': filter_})
+
+  def __getstate__(self):
+return self._beam_options
+
+  def __setstate__(self, options):
+self._initialize(options)
+
+  def _initialize(self, options):
+self._beam_options = options
+self.table = None
+self.sample_row_keys = None
+self.row_count = Metrics.counter(self.__class__.__name__, 'Rows read')
+
+  def start_bundle(self):
+if self.table is None:
+  self.table = Client(project=self._beam_options['project_id'])\
+.instance(self._beam_options['instance_id'])\
+.table(self._beam_options['table_id'])
+
+  def process(self, element, **kwargs):
+for row in self.table.read_rows(start_key=element.start_position,
+end_key=element.end_position,
+filter_=self._beam_options['filter_']):
+  self.row_count.inc()
+  yield row
+
+  def display_data(self):
+return {'projectId': DisplayDataItem(self._beam_options['project_id'],
+ label='Bigtable Project Id'),
+'instanceId': DisplayDataItem(self._beam_options['instance_id'],
+  label='Bigtable Instance Id'),
+'tableId': DisplayDataItem(self._beam_options['table_id'],
+   label='Bigtable Table Id'),
+'filter_': DisplayDataItem(str(self._beam_options['filter_']),
+   label='Bigtable Filter')
+   }
+
+
+class ReadFromBigTable(beam.PTransform):
+  def __init__(self, project_id, instance_id, table_id, filter_=b''):
+""" The PTransform to access the Bigtable Read connector
+
+Args:
+  project_id: [str] GCP Project of to read the Rows
+  instance_id): [str] GCP Instance to read the Rows
+  table_id): [str] GCP Table to read the Rows
+  filter_: [RowFilter] Filter to apply to columns in a row.
+"""
+super(self.__class__, self).__init__()
+self._beam_options = {'project_id': project_id,
+  'instance_id': instance_id,
+  'table_id': table_id,
+  'filter_': filter_}
+
+  def 

[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2019-08-20 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=298058&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-298058
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 20/Aug/19 17:33
Start Date: 20/Aug/19 17:33
Worklog Time Spent: 10m 
  Work Description: pabloem commented on issue #8457: [BEAM-3342] Create a 
Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#issuecomment-523117265
 
 
   Run Portable_Python PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 298058)
Time Spent: 37h  (was: 36h 50m)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Assignee: Solomon Duskis
>Priority: Major
>  Time Spent: 37h
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2019-08-20 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=298057&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-298057
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 20/Aug/19 17:33
Start Date: 20/Aug/19 17:33
Worklog Time Spent: 10m 
  Work Description: pabloem commented on issue #8457: [BEAM-3342] Create a 
Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#issuecomment-523117198
 
 
   I found one more lint error:
   ```
   The following files are missing a call to unittest.main():
   --
   apache_beam/io/gcp/bigtableio_it_test.py
   ```
   
   The BTIO test failed due to some issue parsing a table name: 
https://scans.gradle.com/s/rslxxeoqgmw4a/console-log?task=:sdks:python:test-suites:tox:py37:testPy37Gcp#L6836
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 298057)
Time Spent: 36h 50m  (was: 36h 40m)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Assignee: Solomon Duskis
>Priority: Major
>  Time Spent: 36h 50m
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2019-08-20 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=297971&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-297971
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 20/Aug/19 15:40
Start Date: 20/Aug/19 15:40
Worklog Time Spent: 10m 
  Work Description: eddie-scio commented on issue #8457: [BEAM-3342] Create 
a Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#issuecomment-523072457
 
 
   Thanks all for continuing to push this through!
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 297971)
Time Spent: 36h 40m  (was: 36.5h)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Assignee: Solomon Duskis
>Priority: Major
>  Time Spent: 36h 40m
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2019-08-15 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=296042&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-296042
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 16/Aug/19 03:41
Start Date: 16/Aug/19 03:41
Worklog Time Spent: 10m 
  Work Description: mf2199 commented on issue #8457: [BEAM-3342] Create a 
Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#issuecomment-521873132
 
 
   @pabloem You're anything but annoying, and I really appreciate the guidance; 
will get to setting up the `TestPipeline` next.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 296042)
Time Spent: 36.5h  (was: 36h 20m)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Assignee: Solomon Duskis
>Priority: Major
>  Time Spent: 36.5h
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2019-08-15 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=295847&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-295847
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 15/Aug/19 22:55
Start Date: 15/Aug/19 22:55
Worklog Time Spent: 10m 
  Work Description: pabloem commented on issue #8457: [BEAM-3342] Create a 
Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#issuecomment-521825485
 
 
   Hope I'm not being annoying - I'm just passing pointers to where you can 
find the errors : )
   
   See:
   There's some py3 compatibility checks:
   
https://scans.gradle.com/s/nzwat46hx6qh6/console-log?task=:sdks:python:test-suites:tox:py2:lintPy27_3#L30
   
   And a couple other lint issues:
   
https://scans.gradle.com/s/nzwat46hx6qh6/console-log?task=:sdks:python:test-suites:tox:py2:lintPy27#L51
   
   The tests finally arrived at the point of running the bigtableio test, and 
it's failing with this info:
   
https://scans.gradle.com/s/nzwat46hx6qh6/console-log?task=:sdks:python:test-suites:tox:py35:testPy35Gcp#L7074
   
   We're getting pretty close.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 295847)
Time Spent: 36h 20m  (was: 36h 10m)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Assignee: Solomon Duskis
>Priority: Major
>  Time Spent: 36h 20m
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2019-08-15 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=295708&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-295708
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 15/Aug/19 19:50
Start Date: 15/Aug/19 19:50
Worklog Time Spent: 10m 
  Work Description: mf2199 commented on issue #8457: [BEAM-3342] Create a 
Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#issuecomment-521773234
 
 
   @pabloem running `tox -e py27-lint` locally throws an error [reproduced on 2 
different machines]:
   ```
   (Beam-MF) C:\git\beam-MF\sdks\python>tox -e py27-lint
   GLOB sdist-make: C:\git\beam-MF\sdks\python\setup.py
   py27-lint create: C:\git\beam-MF\sdks\python\target\.tox\py27-lint
   py27-lint installdeps: pycodestyle==2.3.1, pylint==1.9.3, future==0.16.0, 
isort==4.2.15, flake8==3.5.0
   ERROR: invocation failed (exit code 2), logfile: 
C:\git\beam-MF\sdks\python\target\.tox\py27-lint\log\py27-lint-1.log
   == log start 
==
   C:\git\beam-MF\sdks\python\target\.tox\py27-lint\Scripts/python: can't open 
file 'C:\git\beam-MF\sdks\python\target\.tox\py27-lint\Scripts/pip': [Errno 2] 
No such file or directory
   
   === log end 
===
   ERROR: could not install deps [pycodestyle==2.3.1, pylint==1.9.3, 
future==0.16.0, isort==4.2.15, flake8==3.5.0]; v = 
InvocationError(u"'C:\\git\\beam-MF\\sdks\\python\\target\\.tox\\py27-lint\\Scripts/python'
 'C:\\git\\beam-MF\\sdks\\python\\target\\.tox\\py27-lint
   \\Scripts/pip' install --retries 10 pycodestyle==2.3.1 pylint==1.9.3 
future==0.16.0 isort==4.2.15 flake8==3.5.0", 2)
   ___ summary 
___
   ERROR:   py27-lint: could not install deps [pycodestyle==2.3.1, 
pylint==1.9.3, future==0.16.0, isort==4.2.15, flake8==3.5.0]; v = 
InvocationError(u"'C:\\git\\beam-MF\\sdks\\python\\target\\.tox\\py27-lint\\Scripts/python'
 'C:\\git\\beam-MF\\sdks\\python\\target\\.t
   ox\\py27-lint\\Scripts/pip' install --retries 10 pycodestyle==2.3.1 
pylint==1.9.3 future==0.16.0 isort==4.2.15 flake8==3.5.0", 2)
   ```
   Although not the best, as of right now using Git checks seems to be the only 
viable way.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 295708)
Time Spent: 36h 10m  (was: 36h)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Assignee: Solomon Duskis
>Priority: Major
>  Time Spent: 36h 10m
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2019-08-15 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=295583&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-295583
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 15/Aug/19 17:05
Start Date: 15/Aug/19 17:05
Worklog Time Spent: 10m 
  Work Description: pabloem commented on issue #8457: [BEAM-3342] Create a 
Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#issuecomment-521718261
 
 
   To run the lint checks from your machine, you can run `tox -e py27-lint`, 
`tox -e py3-lint`. Same for the whole set of unit tests of the project: `tox -e 
py35-gcp`. All of that from within `sdks/python`, where `tox.ini` is.
   
   Remaining lint issues: 
https://scans.gradle.com/s/czkwhkh2l3e22/console-log?task=:sdks:python:test-suites:tox:py2:lintPy27
   
   Python 3 complains about using `print` as a statement: 
https://scans.gradle.com/s/czkwhkh2l3e22/console-log?task=:sdks:python:test-suites:tox:py35:lintPy35
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 295583)
Time Spent: 36h  (was: 35h 50m)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Assignee: Solomon Duskis
>Priority: Major
>  Time Spent: 36h
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2019-08-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=295111&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-295111
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 15/Aug/19 00:56
Start Date: 15/Aug/19 00:56
Worklog Time Spent: 10m 
  Work Description: pabloem commented on pull request #8457: [BEAM-3342] 
Create a Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#discussion_r314140821
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigtableio_it_test.py
 ##
 @@ -0,0 +1,189 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+""" Integration test for GCP Bigtable testing."""
+from __future__ import absolute_import
+
+import argparse
+import datetime
+import logging
+import random
+import string
+import time
+import unittest
+from nose.plugins.attrib import attr
+
+import apache_beam as beam
+from apache_beam.metrics.metric import MetricsFilter
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.runners.runner import PipelineState
+from apache_beam.testing.util import assert_that, equal_to
+from apache_beam.transforms.combiners import Count
+
+try:
+  from google.cloud.bigtable import enums, row, column_family, Client
+except ImportError:
+  Client = None
+
+import bigtableio
+
+class GenerateTestRows(beam.PTransform):
+  """ A PTransform to generate dummy rows to write to a Bigtable Table.
+
+  A PTransform that generates a list of `DirectRow` and writes it to a 
Bigtable Table.
+  """
+  def __init__(self):
+super(self.__class__, self).__init__()
+self.beam_options = {'project_id': PROJECT_ID,
+ 'instance_id': INSTANCE_ID,
+ 'table_id': TABLE_ID}
+
+  def _generate(self):
+for i in range(ROW_COUNT):
+  key = "key_%s" % ('{0:012}'.format(i))
+  test_row = row.DirectRow(row_key=key)
+  value = ''.join(random.choice(LETTERS_AND_DIGITS) for _ in 
range(CELL_SIZE))
+  for j in range(COLUMN_COUNT):
+test_row.set_cell(column_family_id=COLUMN_FAMILY_ID,
+  column=('field%s' % j).encode('utf-8'),
+  value=value,
+  timestamp=datetime.datetime.now())
+  yield test_row
+
+  def expand(self, pvalue):
+return (pvalue
+| beam.Create(self._generate())
+| 
bigtableio.WriteToBigTable(project_id=self.beam_options['project_id'],
+ 
instance_id=self.beam_options['instance_id'],
+ 
table_id=self.beam_options['table_id']))
+
+@unittest.skipIf(Client is None, 'GCP Bigtable dependencies are not installed')
+class BigtableIOTest(unittest.TestCase):
+  """ Bigtable IO Connector Test
+
+  This tests the connector both ways, first writing rows to a new table, then 
reading them and comparing the counters
+  """
+  def setUp(self):
+self.result = None
+self.table = Client(project=PROJECT_ID, admin=True)\
+.instance(instance_id=INSTANCE_ID)\
+.table(TABLE_ID)
+
+if not self.table.exists():
+  column_families = {COLUMN_FAMILY_ID: column_family.MaxVersionsGCRule(2)}
+  self.table.create(column_families=column_families)
+  logging.info('Table {} has been created!'.format(TABLE_ID))
+
+  @attr('IT')
+  def test_bigtable_io(self):
+print 'Project ID: ', PROJECT_ID
+print 'Instance ID:', INSTANCE_ID
+print 'Table ID:   ', TABLE_ID
 
 Review comment:
   Maybe avoid printing here, or use `logging.info` instead. If you do want to 
use print, use `print(...)` as a function (this is necessary for Py3 
compatibility).
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 295111)
Time 

[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2019-08-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=295108&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-295108
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 15/Aug/19 00:55
Start Date: 15/Aug/19 00:55
Worklog Time Spent: 10m 
  Work Description: pabloem commented on pull request #8457: [BEAM-3342] 
Create a Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#discussion_r314140722
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigtableio_it_test.py
 ##
 @@ -0,0 +1,189 @@
+#
 
 Review comment:
   There are lint errors. See them here: 
https://scans.gradle.com/s/fcjggzcdv6klo/console-log?task=:sdks:python:test-suites:tox:py2:lintPy27#L20
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 295108)
Time Spent: 35h 20m  (was: 35h 10m)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Assignee: Solomon Duskis
>Priority: Major
>  Time Spent: 35h 20m
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2019-08-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=295109&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-295109
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 15/Aug/19 00:55
Start Date: 15/Aug/19 00:55
Worklog Time Spent: 10m 
  Work Description: pabloem commented on pull request #8457: [BEAM-3342] 
Create a Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#discussion_r314141487
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigtableio_it_test.py
 ##
 @@ -0,0 +1,189 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+""" Integration test for GCP Bigtable testing."""
+from __future__ import absolute_import
+
+import argparse
+import datetime
+import logging
+import random
+import string
+import time
+import unittest
+from nose.plugins.attrib import attr
+
+import apache_beam as beam
+from apache_beam.metrics.metric import MetricsFilter
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.runners.runner import PipelineState
+from apache_beam.testing.util import assert_that, equal_to
+from apache_beam.transforms.combiners import Count
+
+try:
+  from google.cloud.bigtable import enums, row, column_family, Client
+except ImportError:
+  Client = None
+
+import bigtableio
+
+class GenerateTestRows(beam.PTransform):
+  """ A PTransform to generate dummy rows to write to a Bigtable Table.
+
+  A PTransform that generates a list of `DirectRow` and writes it to a 
Bigtable Table.
+  """
+  def __init__(self):
+super(self.__class__, self).__init__()
+self.beam_options = {'project_id': PROJECT_ID,
+ 'instance_id': INSTANCE_ID,
+ 'table_id': TABLE_ID}
+
+  def _generate(self):
+for i in range(ROW_COUNT):
+  key = "key_%s" % ('{0:012}'.format(i))
+  test_row = row.DirectRow(row_key=key)
+  value = ''.join(random.choice(LETTERS_AND_DIGITS) for _ in 
range(CELL_SIZE))
+  for j in range(COLUMN_COUNT):
+test_row.set_cell(column_family_id=COLUMN_FAMILY_ID,
+  column=('field%s' % j).encode('utf-8'),
+  value=value,
+  timestamp=datetime.datetime.now())
+  yield test_row
+
+  def expand(self, pvalue):
+return (pvalue
+| beam.Create(self._generate())
+| 
bigtableio.WriteToBigTable(project_id=self.beam_options['project_id'],
+ 
instance_id=self.beam_options['instance_id'],
+ 
table_id=self.beam_options['table_id']))
+
+@unittest.skipIf(Client is None, 'GCP Bigtable dependencies are not installed')
+class BigtableIOTest(unittest.TestCase):
+  """ Bigtable IO Connector Test
+
+  This tests the connector both ways, first writing rows to a new table, then 
reading them and comparing the counters
+  """
+  def setUp(self):
+self.result = None
+self.table = Client(project=PROJECT_ID, admin=True)\
+.instance(instance_id=INSTANCE_ID)\
+.table(TABLE_ID)
+
+if not self.table.exists():
+  column_families = {COLUMN_FAMILY_ID: column_family.MaxVersionsGCRule(2)}
+  self.table.create(column_families=column_families)
+  logging.info('Table {} has been created!'.format(TABLE_ID))
+
+  @attr('IT')
+  def test_bigtable_io(self):
+print 'Project ID: ', PROJECT_ID
+print 'Instance ID:', INSTANCE_ID
+print 'Table ID:   ', TABLE_ID
+
+pipeline_options = 
PipelineOptions(pipeline_parameters(job_name=make_job_name()))
+p = beam.Pipeline(options=pipeline_options)
+_ = (p | 'Write Test Rows' >> GenerateTestRows())
+
+self.result = p.run()
+self.result.wait_until_finish()
+
+assert self.result.state == PipelineState.DONE
+
+if not hasattr(self.result, 'has_job') or self.result.has_job:
+  query_result = 
self.result.metrics().query(MetricsFilter().with_name('Written Row'))
+  if query_result['counters']:
+read_counter = query_result['counters'][0]
+logging.info('Num

[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2019-08-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=295110&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-295110
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 15/Aug/19 00:55
Start Date: 15/Aug/19 00:55
Worklog Time Spent: 10m 
  Work Description: pabloem commented on pull request #8457: [BEAM-3342] 
Create a Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#discussion_r314140638
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigtableio_it_test.py
 ##
 @@ -0,0 +1,189 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+""" Integration test for GCP Bigtable testing."""
+from __future__ import absolute_import
+
+import argparse
+import datetime
+import logging
+import random
+import string
+import time
+import unittest
+from nose.plugins.attrib import attr
+
+import apache_beam as beam
+from apache_beam.metrics.metric import MetricsFilter
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.runners.runner import PipelineState
+from apache_beam.testing.util import assert_that, equal_to
+from apache_beam.transforms.combiners import Count
+
+try:
+  from google.cloud.bigtable import enums, row, column_family, Client
+except ImportError:
+  Client = None
+
+import bigtableio
 
 Review comment:
   Please avoid relative imports. Import via apache_beam.io.gcp.bigtableio
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 295110)
Time Spent: 35h 40m  (was: 35.5h)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Assignee: Solomon Duskis
>Priority: Major
>  Time Spent: 35h 40m
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2019-08-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=295107&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-295107
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 15/Aug/19 00:55
Start Date: 15/Aug/19 00:55
Worklog Time Spent: 10m 
  Work Description: pabloem commented on pull request #8457: [BEAM-3342] 
Create a Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#discussion_r314140821
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigtableio_it_test.py
 ##
 @@ -0,0 +1,189 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+""" Integration test for GCP Bigtable testing."""
+from __future__ import absolute_import
+
+import argparse
+import datetime
+import logging
+import random
+import string
+import time
+import unittest
+from nose.plugins.attrib import attr
+
+import apache_beam as beam
+from apache_beam.metrics.metric import MetricsFilter
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.runners.runner import PipelineState
+from apache_beam.testing.util import assert_that, equal_to
+from apache_beam.transforms.combiners import Count
+
+try:
+  from google.cloud.bigtable import enums, row, column_family, Client
+except ImportError:
+  Client = None
+
+import bigtableio
+
+class GenerateTestRows(beam.PTransform):
+  """ A PTransform to generate dummy rows to write to a Bigtable Table.
+
+  A PTransform that generates a list of `DirectRow` and writes it to a 
Bigtable Table.
+  """
+  def __init__(self):
+super(self.__class__, self).__init__()
+self.beam_options = {'project_id': PROJECT_ID,
+ 'instance_id': INSTANCE_ID,
+ 'table_id': TABLE_ID}
+
+  def _generate(self):
+for i in range(ROW_COUNT):
+  key = "key_%s" % ('{0:012}'.format(i))
+  test_row = row.DirectRow(row_key=key)
+  value = ''.join(random.choice(LETTERS_AND_DIGITS) for _ in 
range(CELL_SIZE))
+  for j in range(COLUMN_COUNT):
+test_row.set_cell(column_family_id=COLUMN_FAMILY_ID,
+  column=('field%s' % j).encode('utf-8'),
+  value=value,
+  timestamp=datetime.datetime.now())
+  yield test_row
+
+  def expand(self, pvalue):
+return (pvalue
+| beam.Create(self._generate())
+| 
bigtableio.WriteToBigTable(project_id=self.beam_options['project_id'],
+ 
instance_id=self.beam_options['instance_id'],
+ 
table_id=self.beam_options['table_id']))
+
+@unittest.skipIf(Client is None, 'GCP Bigtable dependencies are not installed')
+class BigtableIOTest(unittest.TestCase):
+  """ Bigtable IO Connector Test
+
+  This tests the connector both ways, first writing rows to a new table, then 
reading them and comparing the counters
+  """
+  def setUp(self):
+self.result = None
+self.table = Client(project=PROJECT_ID, admin=True)\
+.instance(instance_id=INSTANCE_ID)\
+.table(TABLE_ID)
+
+if not self.table.exists():
+  column_families = {COLUMN_FAMILY_ID: column_family.MaxVersionsGCRule(2)}
+  self.table.create(column_families=column_families)
+  logging.info('Table {} has been created!'.format(TABLE_ID))
+
+  @attr('IT')
+  def test_bigtable_io(self):
+print 'Project ID: ', PROJECT_ID
+print 'Instance ID:', INSTANCE_ID
+print 'Table ID:   ', TABLE_ID
 
 Review comment:
   Maybe avoid printing here, or use `logging.info` instead. If you do want to 
use print, use `print(...)` as a function
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 295107)

> Create a Cloud Bigtable IO connector for Python
> 

[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2019-08-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=294203&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-294203
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 13/Aug/19 21:29
Start Date: 13/Aug/19 21:29
Worklog Time Spent: 10m 
  Work Description: pabloem commented on issue #8457: [BEAM-3342] Create a 
Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#issuecomment-521016687
 
 
   I was out a few days. Looking again today.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 294203)
Time Spent: 35h 10m  (was: 35h)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Assignee: Solomon Duskis
>Priority: Major
>  Time Spent: 35h 10m
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2019-08-06 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=290171&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-290171
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 07/Aug/19 02:10
Start Date: 07/Aug/19 02:10
Worklog Time Spent: 10m 
  Work Description: mf2199 commented on pull request #8457: [BEAM-3342] 
Create a Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#discussion_r311338409
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigtableio.py
 ##
 @@ -122,22 +129,148 @@ class WriteToBigTable(beam.PTransform):
   A PTransform that write a list of `DirectRow` into the Bigtable Table
 
   """
-  def __init__(self, project_id=None, instance_id=None,
-   table_id=None):
+  def __init__(self, project_id=None, instance_id=None, table_id=None):
 """ The PTransform to access the Bigtable Write connector
 Args:
   project_id(str): GCP Project of to write the Rows
   instance_id(str): GCP Instance to write the Rows
   table_id(str): GCP Table to write the `DirectRows`
 """
 super(WriteToBigTable, self).__init__()
-self.beam_options = {'project_id': project_id,
+self._beam_options = {'project_id': project_id,
  'instance_id': instance_id,
  'table_id': table_id}
 
   def expand(self, pvalue):
-beam_options = self.beam_options
+beam_options = self._beam_options
 return (pvalue
 | beam.ParDo(_BigTableWriteFn(beam_options['project_id'],
   beam_options['instance_id'],
   beam_options['table_id'])))
+
+
+class _BigtableReadFn(beam.DoFn):
+  """ Creates the connector that can read rows for Beam pipeline
+
+  Args:
+project_id(str): GCP Project ID
+instance_id(str): GCP Instance ID
+table_id(str): GCP Table ID
+
+  """
+
+  def __init__(self, project_id, instance_id, table_id, start_key=None, 
end_key=None, filter_=b''):
 
 Review comment:
   Agreed, since they both aren't used yet - removed from the constructor, as 
suggested.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 290171)
Time Spent: 35h  (was: 34h 50m)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Assignee: Solomon Duskis
>Priority: Major
>  Time Spent: 35h
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2019-08-06 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=290167&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-290167
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 07/Aug/19 02:08
Start Date: 07/Aug/19 02:08
Worklog Time Spent: 10m 
  Work Description: mf2199 commented on pull request #8457: [BEAM-3342] 
Create a Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#discussion_r311340039
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigtableio.py
 ##
 @@ -122,22 +129,148 @@ class WriteToBigTable(beam.PTransform):
   A PTransform that write a list of `DirectRow` into the Bigtable Table
 
   """
-  def __init__(self, project_id=None, instance_id=None,
-   table_id=None):
+  def __init__(self, project_id=None, instance_id=None, table_id=None):
 """ The PTransform to access the Bigtable Write connector
 Args:
   project_id(str): GCP Project of to write the Rows
   instance_id(str): GCP Instance to write the Rows
   table_id(str): GCP Table to write the `DirectRows`
 """
 super(WriteToBigTable, self).__init__()
-self.beam_options = {'project_id': project_id,
+self._beam_options = {'project_id': project_id,
  'instance_id': instance_id,
  'table_id': table_id}
 
   def expand(self, pvalue):
-beam_options = self.beam_options
+beam_options = self._beam_options
 return (pvalue
 | beam.ParDo(_BigTableWriteFn(beam_options['project_id'],
   beam_options['instance_id'],
   beam_options['table_id'])))
+
+
+class _BigtableReadFn(beam.DoFn):
+  """ Creates the connector that can read rows for Beam pipeline
+
+  Args:
+project_id(str): GCP Project ID
+instance_id(str): GCP Instance ID
+table_id(str): GCP Table ID
+
+  """
+
+  def __init__(self, project_id, instance_id, table_id, start_key=None, 
end_key=None, filter_=b''):
+""" Constructor of the Read connector of Bigtable
+
+Args:
+  project_id: [str] GCP Project of to write the Rows
+  instance_id: [str] GCP Instance to write the Rows
+  table_id: [str] GCP Table to write the `DirectRows`
+  filter_: [RowFilter] Filter to apply to columns in a row.
+"""
+super(self.__class__, self).__init__()
+self._initialize({'project_id': project_id,
+  'instance_id': instance_id,
+  'table_id': table_id,
+  'start_key': start_key,
+  'end_key': end_key,
+  'filter_': filter_})
+
+  def __getstate__(self):
+return self._beam_options
+
+  def __setstate__(self, options):
+self._initialize(options)
+
+  def _initialize(self, options):
+self._beam_options = options
+self.table = None
+self.sample_row_keys = None
+self.row_count = Metrics.counter(self.__class__.__name__, 'Rows read')
+
+  def start_bundle(self):
+if self.table is None:
+  self.table = Client(project=self._beam_options['project_id'])\
+.instance(self._beam_options['instance_id'])\
+.table(self._beam_options['table_id'])
+
+  def process(self, element, **kwargs):
+for row in self.table.read_rows(start_key=self._beam_options['start_key'],
+end_key=self._beam_options['end_key'],
+filter_=self._beam_options['filter_']):
+  self.written.inc()
+  yield row
+
+  def get_initial_restriction(self, element):
+pass
+
+  def finish_bundle(self):
+  pass
+
+  def display_data(self):
+return {'projectId': DisplayDataItem(self._beam_options['project_id'],
+ label='Bigtable Project Id'),
+'instanceId': DisplayDataItem(self._beam_options['instance_id'],
+  label='Bigtable Instance Id'),
+'tableId': DisplayDataItem(self._beam_options['table_id'],
+   label='Bigtable Table Id'),
+'filter_': DisplayDataItem(self._beam_options['filter_'],
+   label='Bigtable Filter')
+}
+
+
+class ReadFromBigTable(beam.PTransform):
+  def __init__(self, project_id, instance_id, table_id, filter_=b''):
+""" The PTransform to access the Bigtable Read connector
+
+Args:
+  project_id: [str] GCP Project of to read the Rows
+  instance_id): [str] GCP Instance to read the Rows
+  table_id): [str] GCP Table to read the Rows
+  filter_: [RowFilter] Filter to apply to columns in a row.
+"""
+super(self.__class__, self).__init__()
+self._beam_options = {'project_id': project_id,
+ 'inst

[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

2019-08-06 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=290168&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-290168
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 07/Aug/19 02:08
Start Date: 07/Aug/19 02:08
Worklog Time Spent: 10m 
  Work Description: mf2199 commented on pull request #8457: [BEAM-3342] 
Create a Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#discussion_r311341218
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigtableio_it_test.py
 ##
 @@ -0,0 +1,187 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+""" Integration test for GCP Bigtable testing."""
+from __future__ import absolute_import
+
+import argparse
+import datetime
+import logging
+import random
+import string
+import time
+import unittest
+
+import apache_beam as beam
+from apache_beam.metrics.metric import MetricsFilter
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.runners.runner import PipelineState
+from apache_beam.testing.util import assert_that, equal_to
+from apache_beam.transforms.combiners import Count
+
+try:
+  from google.cloud.bigtable import enums, row, column_family, Client
+except ImportError:
+  Client = None
+
+import bigtableio
+
+class GenerateTestRows(beam.PTransform):
+  """ A PTransform to generate dummy rows to write to a Bigtable Table.
+
+  A PTransform that generates a list of `DirectRow` and writes it to a 
Bigtable Table.
+  """
+  def __init__(self):
+super(self.__class__, self).__init__()
+self.beam_options = {'project_id': PROJECT_ID,
+ 'instance_id': INSTANCE_ID,
+ 'table_id': TABLE_ID}
+
+  def _generate(self):
+for i in range(ROW_COUNT):
+  key = "key_%s" % ('{0:012}'.format(i))
+  test_row = row.DirectRow(row_key=key)
+  value = ''.join(random.choice(LETTERS_AND_DIGITS) for _ in 
range(CELL_SIZE))
+  for j in range(COLUMN_COUNT):
+test_row.set_cell(column_family_id=COLUMN_FAMILY_ID,
+  column=('field%s' % j).encode('utf-8'),
+  value=value,
+  timestamp=datetime.datetime.now())
+  yield test_row
+
+  def expand(self, pvalue):
+return (pvalue
+| beam.Create(self._generate())
+| 
bigtableio.WriteToBigTable(project_id=self.beam_options['project_id'],
+ 
instance_id=self.beam_options['instance_id'],
+ 
table_id=self.beam_options['table_id']))
+
+@unittest.skipIf(Client is None, 'GCP Bigtable dependencies are not installed')
+class BigtableIOTest(unittest.TestCase):
+  """ Bigtable IO Connector Test
+
+  This tests the connector both ways, first writing rows to a new table, then 
reading them and comparing the counters
+  """
+  def setUp(self):
+self.result = None
+self.table = Client(project=PROJECT_ID, admin=True)\
+.instance(instance_id=INSTANCE_ID)\
+.table(TABLE_ID)
+
+if not self.table.exists():
+  column_families = {COLUMN_FAMILY_ID: column_family.MaxVersionsGCRule(2)}
+  self.table.create(column_families=column_families)
+  logging.info('Table {} has been created!'.format(TABLE_ID))
+
+  def test_bigtable_io(self):
 
 Review comment:
   Again, thanks for the tip. Done and noted for the future.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 290168)
Time Spent: 34h 50m  (was: 34h 40m)

> Create a Cloud Bigtable IO connector for Python
> ---
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
> 

  1   2   >