[jira] [Work logged] (BEAM-7825) Python's DirectRunner emits multiple panes per window and does not discard late data

2019-10-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7825?focusedWorklogId=323991=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-323991
 ]

ASF GitHub Bot logged work on BEAM-7825:


Author: ASF GitHub Bot
Created on: 05/Oct/19 23:14
Start Date: 05/Oct/19 23:14
Worklog Time Spent: 10m 
  Work Description: stale[bot] commented on issue #9164: [BEAM-7825] Add 
test showing inconsistent stream processing with DirectRunner
URL: https://github.com/apache/beam/pull/9164#issuecomment-538696939
 
 
   This pull request has been marked as stale due to 60 days of inactivity. It 
will be closed in 1 week if no further activity occurs. If you think that’s 
incorrect or this pull request requires a review, please simply write any 
comment. If closed, you can revive the PR at any time and @mention a reviewer 
or discuss it on the d...@beam.apache.org list. Thank you for your 
contributions.
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 323991)
Time Spent: 4.5h  (was: 4h 20m)

> Python's DirectRunner emits multiple panes per window and does not discard 
> late data
> 
>
> Key: BEAM-7825
> URL: https://issues.apache.org/jira/browse/BEAM-7825
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Affects Versions: 2.13.0
> Environment: OS: Debian rodete.
> Beam versions: 2.15.0.dev.
> Python versions: Python 2.7, Python 3.7
>Reporter: Alexey Strokach
>Priority: Major
>  Time Spent: 4.5h
>  Remaining Estimate: 0h
>
> The documentation for Beam's Windowing and Triggers functionality [states 
> that|https://beam.apache.org/documentation/programming-guide/#triggers] _"if 
> you use Beam’s default windowing configuration and default trigger, Beam 
> outputs the aggregated result when it estimates all data has arrived, and 
> discards all subsequent data for that window"_. However, it seems that the 
> current behavior of Python's DirectRunner is inconsistent with both of those 
> points. As the {{StreamingWordGroupIT.test_discard_late_data}} test shows, 
> DirectRunner appears to process every data point that it reads from the input 
> stream, irrespective of whether or not the timestamp of that data point is 
> older than the timestamps of the windows that have already been processed. 
> Furthermore, as the {{StreamingWordGroupIT.test_single_output_per_window}} 
> test shows, DirectRunner generates multiple "panes" for the same window, 
> apparently disregarding the notion of a watermark?
> The Dataflow runner passes both of those end-to-end tests.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7732) Allow access to SpannerOptions in Beam

2019-10-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7732?focusedWorklogId=323984=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-323984
 ]

ASF GitHub Bot logged work on BEAM-7732:


Author: ASF GitHub Bot
Created on: 05/Oct/19 21:13
Start Date: 05/Oct/19 21:13
Worklog Time Spent: 10m 
  Work Description: stale[bot] commented on pull request #9048: [BEAM-7732] 
Enable setting custom SpannerOptions.
URL: https://github.com/apache/beam/pull/9048
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 323984)
Time Spent: 1.5h  (was: 1h 20m)

> Allow access to SpannerOptions in Beam
> --
>
> Key: BEAM-7732
> URL: https://issues.apache.org/jira/browse/BEAM-7732
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-gcp
>Affects Versions: 2.12.0, 2.13.0
>Reporter: Niel Markwick
>Priority: Minor
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> Beam hides the 
> [SpannerOptions|https://github.com/googleapis/google-cloud-java/blob/master/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java]
>  object behind a 
> [SpannerConfig|https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java]
>  object because the SpannerOptions object is not serializable. 
> This means that the only options that can be set are those that can be 
> specified in SpannerConfig - limited to host, project, instance, database.
> Suggestion: add the possibility to set a SpannerOptionsFactory in 
> SpannerConfig:
> {code:java}
> public interface SpannerOptionsFactory extends Serializable {
>    public SpannerOptions create();
> }
> {code}
> This would allow the user use this factory class to specify custom 
> SpannerOptions before they are passed onto the connectToSpanner() method; 
> connectToSpanner() would then become: 
> {code:java}
> public SpannerAccessor connectToSpanner() {
>   
>   SpannerOptions.Builder builder = spannerOptionsFactory.create().toBuilder();
>   // rest of connectToSpanner follows, setting project, host, etc.
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7732) Allow access to SpannerOptions in Beam

2019-10-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7732?focusedWorklogId=323983=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-323983
 ]

ASF GitHub Bot logged work on BEAM-7732:


Author: ASF GitHub Bot
Created on: 05/Oct/19 21:13
Start Date: 05/Oct/19 21:13
Worklog Time Spent: 10m 
  Work Description: stale[bot] commented on issue #9048: [BEAM-7732] Enable 
setting custom SpannerOptions.
URL: https://github.com/apache/beam/pull/9048#issuecomment-538688999
 
 
   This pull request has been closed due to lack of activity. If you think that 
is incorrect, or the pull request requires review, you can revive the PR at any 
time.
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 323983)
Time Spent: 1h 20m  (was: 1h 10m)

> Allow access to SpannerOptions in Beam
> --
>
> Key: BEAM-7732
> URL: https://issues.apache.org/jira/browse/BEAM-7732
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-gcp
>Affects Versions: 2.12.0, 2.13.0
>Reporter: Niel Markwick
>Priority: Minor
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> Beam hides the 
> [SpannerOptions|https://github.com/googleapis/google-cloud-java/blob/master/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java]
>  object behind a 
> [SpannerConfig|https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java]
>  object because the SpannerOptions object is not serializable. 
> This means that the only options that can be set are those that can be 
> specified in SpannerConfig - limited to host, project, instance, database.
> Suggestion: add the possibility to set a SpannerOptionsFactory in 
> SpannerConfig:
> {code:java}
> public interface SpannerOptionsFactory extends Serializable {
>    public SpannerOptions create();
> }
> {code}
> This would allow the user use this factory class to specify custom 
> SpannerOptions before they are passed onto the connectToSpanner() method; 
> connectToSpanner() would then become: 
> {code:java}
> public SpannerAccessor connectToSpanner() {
>   
>   SpannerOptions.Builder builder = spannerOptionsFactory.create().toBuilder();
>   // rest of connectToSpanner follows, setting project, host, etc.
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8351) Support passing in arbitrary KV pairs to sdk worker via external environment config

2019-10-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8351?focusedWorklogId=323978=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-323978
 ]

ASF GitHub Bot logged work on BEAM-8351:


Author: ASF GitHub Bot
Created on: 05/Oct/19 20:21
Start Date: 05/Oct/19 20:21
Worklog Time Spent: 10m 
  Work Description: violalyu commented on pull request #9730: [BEAM-8351] 
Support passing in arbitrary KV pairs to sdk worker via external environment 
config
URL: https://github.com/apache/beam/pull/9730#discussion_r331760595
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/portable_runner_test.py
 ##
 @@ -300,6 +301,42 @@ def test__create_process_environment(self):
 command='run.sh',
 ).SerializeToString()))
 
+  def test__create_external_environment(self):
+self.assertEqual(
+PortableRunner._create_environment(PipelineOptions.from_dictionary({
+'environment_type': "EXTERNAL",
+'environment_config': 'localhost:5',
+})), beam_runner_api_pb2.Environment(
+urn=common_urns.environments.EXTERNAL.urn,
+payload=beam_runner_api_pb2.ExternalPayload(
+endpoint=endpoints_pb2.ApiServiceDescriptor(
+url='localhost:5')
+).SerializeToString()))
+self.assertEqual(
+PortableRunner._create_environment(PipelineOptions.from_dictionary({
+'environment_type': "EXTERNAL",
+'environment_config': ' {"url":"localhost:5", '
 
 Review comment:
   I updated the test for this part in this commit: 
https://github.com/apache/beam/pull/9730/commits/916e170efb2e8f2ec80dba10bc26f8bf00467988
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 323978)
Time Spent: 2h 40m  (was: 2.5h)

> Support passing in arbitrary KV pairs to sdk worker via external environment 
> config
> ---
>
> Key: BEAM-8351
> URL: https://issues.apache.org/jira/browse/BEAM-8351
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-core, sdk-py-harness
>Reporter: Wanqi Lyu
>Assignee: Wanqi Lyu
>Priority: Minor
>  Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> Originally, the environment config for environment type of EXTERNAL only 
> support passing in an url for the external worker pool; We want to support 
> passing in arbitrary KV pairs to sdk worker via external environment config, 
> so that the when starting the sdk harness we could get the values from 
> `StartWorkerRequest.params`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8351) Support passing in arbitrary KV pairs to sdk worker via external environment config

2019-10-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8351?focusedWorklogId=323977=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-323977
 ]

ASF GitHub Bot logged work on BEAM-8351:


Author: ASF GitHub Bot
Created on: 05/Oct/19 20:20
Start Date: 05/Oct/19 20:20
Worklog Time Spent: 10m 
  Work Description: violalyu commented on pull request #9730: [BEAM-8351] 
Support passing in arbitrary KV pairs to sdk worker via external environment 
config
URL: https://github.com/apache/beam/pull/9730#discussion_r331760578
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/portable_runner.py
 ##
 @@ -129,11 +129,25 @@ def _create_environment(options):
   env=(config.get('env') or '')
   ).SerializeToString())
 elif environment_urn == common_urns.environments.EXTERNAL.urn:
+  def looks_like_json(environment_config):
+import re
+return re.match(r'\s*\{.*\}\s*$', environment_config)
+
+  if looks_like_json(portable_options.environment_config):
+config = json.loads(portable_options.environment_config)
+url = config.pop('url', None)
 
 Review comment:
   Thanks Maximilian! I've updated in this commit: 
https://github.com/apache/beam/pull/9730/commits/95785590993a1035e4619e0f2b7a4434e03f1abc
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 323977)
Time Spent: 2.5h  (was: 2h 20m)

> Support passing in arbitrary KV pairs to sdk worker via external environment 
> config
> ---
>
> Key: BEAM-8351
> URL: https://issues.apache.org/jira/browse/BEAM-8351
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-core, sdk-py-harness
>Reporter: Wanqi Lyu
>Assignee: Wanqi Lyu
>Priority: Minor
>  Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> Originally, the environment config for environment type of EXTERNAL only 
> support passing in an url for the external worker pool; We want to support 
> passing in arbitrary KV pairs to sdk worker via external environment config, 
> so that the when starting the sdk harness we could get the values from 
> `StartWorkerRequest.params`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-6947) TestGCSIO.test_last_updated (gcsio_test.py) fails when the current timezone offset and the timezone offset on 2nd of Jan 1970 differ

2019-10-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-6947?focusedWorklogId=323975=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-323975
 ]

ASF GitHub Bot logged work on BEAM-6947:


Author: ASF GitHub Bot
Created on: 05/Oct/19 20:14
Start Date: 05/Oct/19 20:14
Worklog Time Spent: 10m 
  Work Description: stale[bot] commented on pull request #8180: [BEAM-6947] 
Fix for failing TestGCSIO.test_last_updated (gcsio_test.py) test
URL: https://github.com/apache/beam/pull/8180
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 323975)
Time Spent: 1.5h  (was: 1h 20m)

> TestGCSIO.test_last_updated (gcsio_test.py) fails when the current timezone 
> offset and the timezone offset on 2nd of Jan 1970 differ
> 
>
> Key: BEAM-6947
> URL: https://issues.apache.org/jira/browse/BEAM-6947
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Csaba Kassai
>Assignee: Csaba Kassai
>Priority: Major
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> The test TestGCSIO.test_last_updated uses timestamp 123456.78 as the last 
> updated timestamp.  This timestamp is converted into a naive datetime in the 
> Fakefile class get_metadata method (gcsio_test.py line 72) Then in the GcsIO 
> class last_updated method (gcsio.py ) is converted back to timestamp. But the 
> conversion is incorrect when the the timezone offset is different in 1970 and 
> now. In my case now Singapore is GMT+8 and it was only GMT+7:30 in 1970. So 
> the test fails. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-6947) TestGCSIO.test_last_updated (gcsio_test.py) fails when the current timezone offset and the timezone offset on 2nd of Jan 1970 differ

2019-10-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-6947?focusedWorklogId=323974=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-323974
 ]

ASF GitHub Bot logged work on BEAM-6947:


Author: ASF GitHub Bot
Created on: 05/Oct/19 20:14
Start Date: 05/Oct/19 20:14
Worklog Time Spent: 10m 
  Work Description: stale[bot] commented on issue #8180: [BEAM-6947] Fix 
for failing TestGCSIO.test_last_updated (gcsio_test.py) test
URL: https://github.com/apache/beam/pull/8180#issuecomment-538684818
 
 
   This pull request has been closed due to lack of activity. If you think that 
is incorrect, or the pull request requires review, you can revive the PR at any 
time.
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 323974)
Time Spent: 1h 20m  (was: 1h 10m)

> TestGCSIO.test_last_updated (gcsio_test.py) fails when the current timezone 
> offset and the timezone offset on 2nd of Jan 1970 differ
> 
>
> Key: BEAM-6947
> URL: https://issues.apache.org/jira/browse/BEAM-6947
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Csaba Kassai
>Assignee: Csaba Kassai
>Priority: Major
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> The test TestGCSIO.test_last_updated uses timestamp 123456.78 as the last 
> updated timestamp.  This timestamp is converted into a naive datetime in the 
> Fakefile class get_metadata method (gcsio_test.py line 72) Then in the GcsIO 
> class last_updated method (gcsio.py ) is converted back to timestamp. But the 
> conversion is incorrect when the the timezone offset is different in 1970 and 
> now. In my case now Singapore is GMT+8 and it was only GMT+7:30 in 1970. So 
> the test fails. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam

2019-10-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=323973=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-323973
 ]

ASF GitHub Bot logged work on BEAM-6766:


Author: ASF GitHub Bot
Created on: 05/Oct/19 20:13
Start Date: 05/Oct/19 20:13
Worklog Time Spent: 10m 
  Work Description: stale[bot] commented on issue #9253: [BEAM-6766] 
Implement SMB high-level API
URL: https://github.com/apache/beam/pull/9253#issuecomment-538684812
 
 
   This pull request has been marked as stale due to 60 days of inactivity. It 
will be closed in 1 week if no further activity occurs. If you think that’s 
incorrect or this pull request requires a review, please simply write any 
comment. If closed, you can revive the PR at any time and @mention a reviewer 
or discuss it on the d...@beam.apache.org list. Thank you for your 
contributions.
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 323973)
Time Spent: 4h 40m  (was: 4.5h)

> Sort Merge Bucket Join support in Beam
> --
>
> Key: BEAM-6766
> URL: https://issues.apache.org/jira/browse/BEAM-6766
> Project: Beam
>  Issue Type: Improvement
>  Components: extensions-java-join-library, io-ideas
>Reporter: Claire McGinty
>Assignee: Claire McGinty
>Priority: Major
>  Time Spent: 4h 40m
>  Remaining Estimate: 0h
>
> Design doc: 
> https://docs.google.com/document/d/1AQlonN8t4YJrARcWzepyP7mWHTxHAd6WIECwk1s3LQQ/edit#
> Hi! Spotify has been internally prototyping and testing an implementation of 
> the sort merge join using Beam primitives and we're interested in 
> contributing it open-source – probably to Beam's extensions package in its 
> own `smb` module or as part of the joins module?
> We've tested this with Avro files using Avro's GenericDatumWriter/Reader 
> directly (although this could theoretically be expanded to other 
> serialization formats). We'd add two transforms*, an SMB write and an SMB 
> join. 
> SMB write would take in one PCollection and a # of buckets and:
> 1) Apply a partitioning function to the input to assign each record to one 
> bucket. (the user code would have to statically specify a # of buckets... 
> hard to see a way to do this dynamically.)
> 2) Group by that bucket ID and within each bucket perform an in-memory sort 
> on join key. If the grouped records are too large to fit in memory, fall back 
> to an external sort (although if this happens, user should probably increase 
> bucket size so every group fits in memory).
> 3) Directly write the contents of bucket to a sequentially named file.
> 4) Write a metadata file to the same output path with info about hash 
> algorithm/# buckets.
> SMB join would take in the input paths for 2 or more Sources, all of which 
> are written in a bucketed and partitioned way, and :
> 1) Verify that the metadata files have compatible bucket # and hash algorithm.
> 2) Expand the input paths to enumerate the `ResourceIds` of every file in the 
> paths. Group all inputs with the same bucket ID.
> 3) Within each group, open a file reader on all `ResourceIds`. Sequentially 
> read files one record at a time, outputting tuples of all record pairs with 
> matching join key.
>  \* These could be implemented either directly as `PTransforms` with the 
> writer being a `DoFn` but I semantically do like the idea of extending 
> `FileBasedSource`/`Sink` with abstract classes like 
> `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a 
> sink as KV pairs of >>, so that the # 
> of elements in the PCollection == # of buckets == # of output files, we could 
> just implement something like `SortedBucketSink` extending `FileBasedSink` 
> with a dynamic file naming function. I'd like to be able to take advantage of 
> the existing write/read implementation logic in the `io` package as much as 
> possible although I guess some of those are package private. 
> –
> From our internal testing, we've seen some substantial performance 
> improvements using the right bucket size--not only by avoiding a shuffle 
> during the join step, but also in storage costs, since we're getting better 
> compression in Avro by storing sorted records.
> Please let us know what you think/any concerns we can address! Our 
> implementation isn't quite production-ready yet, but we'd like to start a 
> discussion about it early.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam

2019-10-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=323972=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-323972
 ]

ASF GitHub Bot logged work on BEAM-6766:


Author: ASF GitHub Bot
Created on: 05/Oct/19 20:13
Start Date: 05/Oct/19 20:13
Worklog Time Spent: 10m 
  Work Description: stale[bot] commented on issue #9279: [BEAM-6766] 
Implement SMB benchmarks
URL: https://github.com/apache/beam/pull/9279#issuecomment-538684811
 
 
   This pull request has been marked as stale due to 60 days of inactivity. It 
will be closed in 1 week if no further activity occurs. If you think that’s 
incorrect or this pull request requires a review, please simply write any 
comment. If closed, you can revive the PR at any time and @mention a reviewer 
or discuss it on the d...@beam.apache.org list. Thank you for your 
contributions.
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 323972)
Time Spent: 4.5h  (was: 4h 20m)

> Sort Merge Bucket Join support in Beam
> --
>
> Key: BEAM-6766
> URL: https://issues.apache.org/jira/browse/BEAM-6766
> Project: Beam
>  Issue Type: Improvement
>  Components: extensions-java-join-library, io-ideas
>Reporter: Claire McGinty
>Assignee: Claire McGinty
>Priority: Major
>  Time Spent: 4.5h
>  Remaining Estimate: 0h
>
> Design doc: 
> https://docs.google.com/document/d/1AQlonN8t4YJrARcWzepyP7mWHTxHAd6WIECwk1s3LQQ/edit#
> Hi! Spotify has been internally prototyping and testing an implementation of 
> the sort merge join using Beam primitives and we're interested in 
> contributing it open-source – probably to Beam's extensions package in its 
> own `smb` module or as part of the joins module?
> We've tested this with Avro files using Avro's GenericDatumWriter/Reader 
> directly (although this could theoretically be expanded to other 
> serialization formats). We'd add two transforms*, an SMB write and an SMB 
> join. 
> SMB write would take in one PCollection and a # of buckets and:
> 1) Apply a partitioning function to the input to assign each record to one 
> bucket. (the user code would have to statically specify a # of buckets... 
> hard to see a way to do this dynamically.)
> 2) Group by that bucket ID and within each bucket perform an in-memory sort 
> on join key. If the grouped records are too large to fit in memory, fall back 
> to an external sort (although if this happens, user should probably increase 
> bucket size so every group fits in memory).
> 3) Directly write the contents of bucket to a sequentially named file.
> 4) Write a metadata file to the same output path with info about hash 
> algorithm/# buckets.
> SMB join would take in the input paths for 2 or more Sources, all of which 
> are written in a bucketed and partitioned way, and :
> 1) Verify that the metadata files have compatible bucket # and hash algorithm.
> 2) Expand the input paths to enumerate the `ResourceIds` of every file in the 
> paths. Group all inputs with the same bucket ID.
> 3) Within each group, open a file reader on all `ResourceIds`. Sequentially 
> read files one record at a time, outputting tuples of all record pairs with 
> matching join key.
>  \* These could be implemented either directly as `PTransforms` with the 
> writer being a `DoFn` but I semantically do like the idea of extending 
> `FileBasedSource`/`Sink` with abstract classes like 
> `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a 
> sink as KV pairs of >>, so that the # 
> of elements in the PCollection == # of buckets == # of output files, we could 
> just implement something like `SortedBucketSink` extending `FileBasedSink` 
> with a dynamic file naming function. I'd like to be able to take advantage of 
> the existing write/read implementation logic in the `io` package as much as 
> possible although I guess some of those are package private. 
> –
> From our internal testing, we've seen some substantial performance 
> improvements using the right bucket size--not only by avoiding a shuffle 
> during the join step, but also in storage costs, since we're getting better 
> compression in Avro by storing sorted records.
> Please let us know what you think/any concerns we can address! Our 
> implementation isn't quite production-ready yet, but we'd like to start a 
> discussion about it early.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7886) Make row coder a standard coder and implement in python

2019-10-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7886?focusedWorklogId=323965=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-323965
 ]

ASF GitHub Bot logged work on BEAM-7886:


Author: ASF GitHub Bot
Created on: 05/Oct/19 18:13
Start Date: 05/Oct/19 18:13
Worklog Time Spent: 10m 
  Work Description: TheNeuralBit commented on pull request #9188: 
[BEAM-7886] Make row coder a standard coder and implement in Python
URL: https://github.com/apache/beam/pull/9188#discussion_r331756295
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/PortableSchemaCoder.java
 ##
 @@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas;
+
+import org.apache.beam.sdk.transforms.SerializableFunctions;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * A version of SchemaCoder that can only produce/consume Row instances.
+ *
+ * Implements the beam:coders:row:v1 standard coder while still satisfying 
the requirement that a
+ * PCollection is only considered to have a schema if its coder is an instance 
of SchemaCoder.
+ */
+public class PortableSchemaCoder extends SchemaCoder {
 
 Review comment:
   I updated this PR to use RowCoder as the Java implementation of 
`beam:coder:row:v1` and removed `PortableSchemaCoder` now that #9424 is in.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 323965)
Time Spent: 11h  (was: 10h 50m)

> Make row coder a standard coder and implement in python
> ---
>
> Key: BEAM-7886
> URL: https://issues.apache.org/jira/browse/BEAM-7886
> Project: Beam
>  Issue Type: Improvement
>  Components: beam-model, sdk-java-core, sdk-py-core
>Reporter: Brian Hulette
>Assignee: Brian Hulette
>Priority: Major
>  Time Spent: 11h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8355) Make BooleanCoder a standard coder

2019-10-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8355?focusedWorklogId=323944=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-323944
 ]

ASF GitHub Bot logged work on BEAM-8355:


Author: ASF GitHub Bot
Created on: 05/Oct/19 15:18
Start Date: 05/Oct/19 15:18
Worklog Time Spent: 10m 
  Work Description: chadrik commented on pull request #9735: [BEAM-8355] 
Add a standard boolean coder
URL: https://github.com/apache/beam/pull/9735
 
 
   This involves making the current java BooleanCoder a standard coder, and 
implementing an equivalent coder in python
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build
 

[jira] [Created] (BEAM-8355) Make BooleanCoder a standard coder

2019-10-05 Thread Chad Dombrova (Jira)
Chad Dombrova created BEAM-8355:
---

 Summary: Make BooleanCoder a standard coder
 Key: BEAM-8355
 URL: https://issues.apache.org/jira/browse/BEAM-8355
 Project: Beam
  Issue Type: New Feature
  Components: beam-model, sdk-java-core, sdk-py-core
Reporter: Chad Dombrova
Assignee: Chad Dombrova


This involves making the current java BooleanCoder a standard coder, and 
implementing an equivalent coder in python



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7322) PubSubIO watermark does not advance for very low volumes

2019-10-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7322?focusedWorklogId=323941=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-323941
 ]

ASF GitHub Bot logged work on BEAM-7322:


Author: ASF GitHub Bot
Created on: 05/Oct/19 15:13
Start Date: 05/Oct/19 15:13
Worklog Time Spent: 10m 
  Work Description: stale[bot] commented on issue #8598: [BEAM-7322] Add 
threshold to PubSub unbounded source
URL: https://github.com/apache/beam/pull/8598#issuecomment-538658619
 
 
   This pull request has been marked as stale due to 60 days of inactivity. It 
will be closed in 1 week if no further activity occurs. If you think that’s 
incorrect or this pull request requires a review, please simply write any 
comment. If closed, you can revive the PR at any time and @mention a reviewer 
or discuss it on the d...@beam.apache.org list. Thank you for your 
contributions.
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 323941)
Time Spent: 2h 10m  (was: 2h)

> PubSubIO watermark does not advance for very low volumes
> 
>
> Key: BEAM-7322
> URL: https://issues.apache.org/jira/browse/BEAM-7322
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-gcp
>Reporter: Tim Sell
>Priority: Minor
> Attachments: data.json
>
>  Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> I have identified an issue where the watermark does not advance when using 
> the beam PubSubIO when volumes are very low.
> I have created a mini example project to demonstrate the behaviour with a 
> python script for generating messages at different frequencies:
> https://github.com/tims/beam/tree/pubsub-watermark-example/pubsub-watermark 
> [note: this is in a directory of a Beam fork for corp hoop jumping 
> convenience on my end, it is not intended for merging].
> The behaviour is easily replicated if you apply a fixed window triggering 
> after the watermark passes the end of the window.
> {code}
> pipeline.apply(PubsubIO.readStrings().fromSubscription(subscription))
> .apply(ParDo.of(new ParseScoreEventFn()))
> 
> .apply(Window.into(FixedWindows.of(Duration.standardSeconds(60)))
> .triggering(AfterWatermark.pastEndOfWindow())
> .withAllowedLateness(Duration.standardSeconds(60))
> .discardingFiredPanes())
> .apply(MapElements.into(kvs(strings(), integers()))
> .via(scoreEvent -> KV.of(scoreEvent.getPlayer(), 
> scoreEvent.getScore(
> .apply(Count.perKey())
> .apply(ParDo.of(Log.of("counted per key")));
> {code}
> With this triggering, using both the flink local runner the direct runner, 
> panes will be fired after a long delay (minutes) for low frequencies of 
> messages in pubsub (seconds). The biggest issue is that it seems no panes 
> will ever be emitted if you just send a few events and stop. This is 
> particularly likely trip up people new to Beam.
> If I change the triggering to have early firings I get exactly the emitted 
> panes that you would expect.
> {code}
> .apply(Window.into(FixedWindows.of(Duration.standardSeconds(60)))
> .triggering(AfterWatermark.pastEndOfWindow()
> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
> .alignedTo(Duration.standardSeconds(60
> .withAllowedLateness(Duration.standardSeconds(60))
> .discardingFiredPanes())
> {code}
> I can use any variation of early firing triggers and they work as expected.
> We believe that the watermark is not advancing when the volume is too low 
> because of the sampling that PubSubIO does to determine it's watermark. It 
> just never has a large enough sample. 
> This problem occurs in the direct runner and flink runner, but not in the 
> dataflow runner (because dataflow uses it's own PubSubIO because dataflow has 
> access to internal details of pubsub and so doesn't need to do any sampling).
> For extra context from the user@ list:
> *Kenneth Knowles:*
> Thanks to your info, I think it is the configuration of MovingFunction [1] 
> that is the likely culprit, but I don't totally understand why. It is 
> configured like so:
>  - store 60 seconds of data
>  - update data every 5 seconds
>  - require at least 10 messages to be 'significant'
>  - require messages from at least 2 distinct 5 second update periods to 
> 'significant'
> I would expect a rate of 1 message per second to satisfy this. I may have 
> read something wrong.
> Have you filed an issue in Jira [2]?
> Kenn
> 

[jira] [Work logged] (BEAM-2879) Implement and use an Avro coder rather than the JSON one for intermediary files to be loaded in BigQuery

2019-10-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-2879?focusedWorklogId=323925=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-323925
 ]

ASF GitHub Bot logged work on BEAM-2879:


Author: ASF GitHub Bot
Created on: 05/Oct/19 13:21
Start Date: 05/Oct/19 13:21
Worklog Time Spent: 10m 
  Work Description: steveniemitz commented on pull request #9665: 
[BEAM-2879] Support writing data to BigQuery via avro
URL: https://github.com/apache/beam/pull/9665#discussion_r331746527
 
 

 ##
 File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AbstractRowWriter.java
 ##
 @@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import com.google.api.services.bigquery.model.TableRow;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.util.UUID;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.CountingOutputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Writes {@link TableRow} objects out to a file. Used when doing batch load 
jobs into BigQuery. */
+abstract class AbstractRowWriter implements AutoCloseable {
 
 Review comment:
   nit: I don't usually write java, is the style convention `AbstractDerp` for 
base classes like this?  or should this just be `RowWriter`?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 323925)
Time Spent: 1h  (was: 50m)

> Implement and use an Avro coder rather than the JSON one for intermediary 
> files to be loaded in BigQuery
> 
>
> Key: BEAM-2879
> URL: https://issues.apache.org/jira/browse/BEAM-2879
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-gcp
>Reporter: Black Phoenix
>Assignee: Steve Niemitz
>Priority: Minor
>  Labels: starter
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> Before being loaded in BigQuery, temporary files are created and encoded in 
> JSON. Which is a costly solution compared to an Avro alternative 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-2879) Implement and use an Avro coder rather than the JSON one for intermediary files to be loaded in BigQuery

2019-10-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-2879?focusedWorklogId=323922=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-323922
 ]

ASF GitHub Bot logged work on BEAM-2879:


Author: ASF GitHub Bot
Created on: 05/Oct/19 13:15
Start Date: 05/Oct/19 13:15
Worklog Time Spent: 10m 
  Work Description: steveniemitz commented on pull request #9665: 
[BEAM-2879] Support writing data to BigQuery via avro
URL: https://github.com/apache/beam/pull/9665#discussion_r331746343
 
 

 ##
 File path: 
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java
 ##
 @@ -243,8 +243,8 @@ public void testConvertBigQuerySchemaToAvroSchema() {
 Schema.create(Type.NULL),
 Schema.createRecord(
 "scion",
-"org.apache.beam.sdk.io.gcp.bigquery",
 "Translated Avro Schema for scion",
+"org.apache.beam.sdk.io.gcp.bigquery",
 
 Review comment:
   oh no actually, this is pretty important!  The previous version flipped the 
namespace and description parameters, resulting in an invalid namespace.  See 
the corresponding change up in BigQueryAvroUtils.
   
   Really avro's fault for making a function with 3 string parameters...
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 323922)
Time Spent: 50m  (was: 40m)

> Implement and use an Avro coder rather than the JSON one for intermediary 
> files to be loaded in BigQuery
> 
>
> Key: BEAM-2879
> URL: https://issues.apache.org/jira/browse/BEAM-2879
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-gcp
>Reporter: Black Phoenix
>Assignee: Steve Niemitz
>Priority: Minor
>  Labels: starter
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> Before being loaded in BigQuery, temporary files are created and encoded in 
> JSON. Which is a costly solution compared to an Avro alternative 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-2879) Implement and use an Avro coder rather than the JSON one for intermediary files to be loaded in BigQuery

2019-10-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-2879?focusedWorklogId=323921=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-323921
 ]

ASF GitHub Bot logged work on BEAM-2879:


Author: ASF GitHub Bot
Created on: 05/Oct/19 13:13
Start Date: 05/Oct/19 13:13
Worklog Time Spent: 10m 
  Work Description: steveniemitz commented on issue #9665: [BEAM-2879] 
Support writing data to BigQuery via avro
URL: https://github.com/apache/beam/pull/9665#issuecomment-538648713
 
 
   Thanks for the thoughts!  My comments inline
   
   > This is just a brain dump of what I'm thinking...
   > 
   > I wonder whether we need the `AvroWriteRequest`, and the Avro schema. I 
guess we do, as the `InputElement` (whatever it is) + the Avro schema are all 
one needs to build the `GenericRecord`. Having the `AvroWriteRequest` may help 
make the formatting function as concise as possible
   
   I really went back and forth on this a few times.  We could use 
`SerializableBiFunction` here, but if in the future we ever wanted to add 
another parameter, it'd be a breaking change.  This was we can just add a field 
to the class.  This follows the same pattern as read does, where it takes a 
`SchemaAndRecord` as an input.  You do need both the avro schema and the 
element though in order to support more advanced cases w/ DynamicDestinations, 
etc.  Plus avro schemas themselves aren't easily serializable (until avro 1.9) 
so users can't simply create a closure over them.
   
   I do hate the name though, if you can think of anything better I'd love to 
rename this!

   > As for supporting Beam schemas + avro files, one could have a 
`useBeamSchemaForAvroFiles()`... though it's a little strange
   > 
   > Another option is to have `useBeamSchema`, and a pre-coded avro formatting 
function called something like ... `BigQueryIOUtils.beamRowToAvroRecord()`. 
Though this is a little awkward too.
   
   Yeah I struggled with this as well.  The only thing stopping us from having 
a version that supports beam schemas is the interface.  
`useBeamSchemaForAvroFiles` is a pretty reasonable name. 
   
   > Overall, I like using `AvroWriteRequest` as input for the avro format 
function... and for supporting Beam schemas, it may be that 
`useBeamSchemaForAvroFiles` (or some better name) is the more reasonable 
options.
   > WDYT?
   
I'd be up for adding that in a follow-up PR.  I also have some ideas around 
`writeGenericRecords()` I want to play around with (that would also use beam 
schemas, similar to `AvroIO.writeGenericRecords()` )
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 323921)
Time Spent: 40m  (was: 0.5h)

> Implement and use an Avro coder rather than the JSON one for intermediary 
> files to be loaded in BigQuery
> 
>
> Key: BEAM-2879
> URL: https://issues.apache.org/jira/browse/BEAM-2879
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-gcp
>Reporter: Black Phoenix
>Assignee: Steve Niemitz
>Priority: Minor
>  Labels: starter
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> Before being loaded in BigQuery, temporary files are created and encoded in 
> JSON. Which is a costly solution compared to an Avro alternative 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8351) Support passing in arbitrary KV pairs to sdk worker via external environment config

2019-10-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8351?focusedWorklogId=323901=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-323901
 ]

ASF GitHub Bot logged work on BEAM-8351:


Author: ASF GitHub Bot
Created on: 05/Oct/19 10:42
Start Date: 05/Oct/19 10:42
Worklog Time Spent: 10m 
  Work Description: mxm commented on pull request #9730: [BEAM-8351] 
Support passing in arbitrary KV pairs to sdk worker via external environment 
config
URL: https://github.com/apache/beam/pull/9730#discussion_r331742440
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/portable_runner_test.py
 ##
 @@ -300,6 +301,42 @@ def test__create_process_environment(self):
 command='run.sh',
 ).SerializeToString()))
 
+  def test__create_external_environment(self):
+self.assertEqual(
+PortableRunner._create_environment(PipelineOptions.from_dictionary({
+'environment_type': "EXTERNAL",
+'environment_config': 'localhost:5',
+})), beam_runner_api_pb2.Environment(
+urn=common_urns.environments.EXTERNAL.urn,
+payload=beam_runner_api_pb2.ExternalPayload(
+endpoint=endpoints_pb2.ApiServiceDescriptor(
+url='localhost:5')
+).SerializeToString()))
+self.assertEqual(
+PortableRunner._create_environment(PipelineOptions.from_dictionary({
+'environment_type': "EXTERNAL",
+'environment_config': ' {"url":"localhost:5", '
 
 Review comment:
   Should we also be testing the case without any space at the beginning?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 323901)
Time Spent: 2h 20m  (was: 2h 10m)

> Support passing in arbitrary KV pairs to sdk worker via external environment 
> config
> ---
>
> Key: BEAM-8351
> URL: https://issues.apache.org/jira/browse/BEAM-8351
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-core, sdk-py-harness
>Reporter: Wanqi Lyu
>Assignee: Wanqi Lyu
>Priority: Minor
>  Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> Originally, the environment config for environment type of EXTERNAL only 
> support passing in an url for the external worker pool; We want to support 
> passing in arbitrary KV pairs to sdk worker via external environment config, 
> so that the when starting the sdk harness we could get the values from 
> `StartWorkerRequest.params`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8351) Support passing in arbitrary KV pairs to sdk worker via external environment config

2019-10-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8351?focusedWorklogId=323900=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-323900
 ]

ASF GitHub Bot logged work on BEAM-8351:


Author: ASF GitHub Bot
Created on: 05/Oct/19 10:42
Start Date: 05/Oct/19 10:42
Worklog Time Spent: 10m 
  Work Description: mxm commented on pull request #9730: [BEAM-8351] 
Support passing in arbitrary KV pairs to sdk worker via external environment 
config
URL: https://github.com/apache/beam/pull/9730#discussion_r331742343
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/portable_runner.py
 ##
 @@ -129,11 +129,25 @@ def _create_environment(options):
   env=(config.get('env') or '')
   ).SerializeToString())
 elif environment_urn == common_urns.environments.EXTERNAL.urn:
+  def looks_like_json(environment_config):
+import re
+return re.match(r'\s*\{.*\}\s*$', environment_config)
+
+  if looks_like_json(portable_options.environment_config):
+config = json.loads(portable_options.environment_config)
+url = config.pop('url', None)
 
 Review comment:
   ```suggestion
   url = config.get('url', None)
   ```
   
   Why pop?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 323900)
Time Spent: 2h 20m  (was: 2h 10m)

> Support passing in arbitrary KV pairs to sdk worker via external environment 
> config
> ---
>
> Key: BEAM-8351
> URL: https://issues.apache.org/jira/browse/BEAM-8351
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-core, sdk-py-harness
>Reporter: Wanqi Lyu
>Assignee: Wanqi Lyu
>Priority: Minor
>  Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> Originally, the environment config for environment type of EXTERNAL only 
> support passing in an url for the external worker pool; We want to support 
> passing in arbitrary KV pairs to sdk worker via external environment config, 
> so that the when starting the sdk harness we could get the values from 
> `StartWorkerRequest.params`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)