[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=299185=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299185
 ]

ASF GitHub Bot logged work on BEAM-5428:


Author: ASF GitHub Bot
Created on: 22/Aug/19 05:57
Start Date: 22/Aug/19 05:57
Worklog Time Spent: 10m 
  Work Description: rakeshcusat commented on pull request #9374: 
[BEAM-5428] Implement Runner support for cache tokens
URL: https://github.com/apache/beam/pull/9374#discussion_r316506735
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
 ##
 @@ -248,33 +253,45 @@ private StateRequestHandler 
getStateRequestHandler(ExecutableStage executableSta
 return StateRequestHandlers.delegateBasedUponType(handlerMap);
   }
 
-  private static class BagUserStateFactory
+  static class BagUserStateFactory
   implements StateRequestHandlers.BagUserStateHandlerFactory {
 
+/** Upper limit of the number of valid cache tokens to hand out to the 
SDK. */
+private static final int MAX_CACHE_SIZE = 100;
+
 private final StateInternals stateInternals;
 private final KeyedStateBackend keyedStateBackend;
 private final Lock stateBackendLock;
+/** Cache is scoped by state id. */
+private final Cache cacheTokens;
+
+private final IdGenerator cacheTokenGenerator;
 
-private BagUserStateFactory(
+BagUserStateFactory(
 StateInternals stateInternals,
 KeyedStateBackend keyedStateBackend,
 Lock stateBackendLock) {
 
   this.stateInternals = stateInternals;
   this.keyedStateBackend = keyedStateBackend;
   this.stateBackendLock = stateBackendLock;
+  this.cacheTokens = 
CacheBuilder.newBuilder().maximumSize(MAX_CACHE_SIZE).build();
+  this.cacheTokenGenerator = IdGenerators.incrementingLongs();
 
 Review comment:
   I am assuming this will be used later part of the implementation. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299185)
Time Spent: 1h  (was: 50m)

> Implement cross-bundle state caching.
> -
>
> Key: BEAM-5428
> URL: https://issues.apache.org/jira/browse/BEAM-5428
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Reporter: Robert Bradshaw
>Assignee: Rakesh Kumar
>Priority: Major
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> Tech spec: 
> [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m]
> Relevant document: 
> [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit]
> Mailing list link: 
> [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E]



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=299184=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299184
 ]

ASF GitHub Bot logged work on BEAM-5428:


Author: ASF GitHub Bot
Created on: 22/Aug/19 05:56
Start Date: 22/Aug/19 05:56
Worklog Time Spent: 10m 
  Work Description: rakeshcusat commented on pull request #9374: 
[BEAM-5428] Implement Runner support for cache tokens
URL: https://github.com/apache/beam/pull/9374#discussion_r316506735
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
 ##
 @@ -248,33 +253,45 @@ private StateRequestHandler 
getStateRequestHandler(ExecutableStage executableSta
 return StateRequestHandlers.delegateBasedUponType(handlerMap);
   }
 
-  private static class BagUserStateFactory
+  static class BagUserStateFactory
   implements StateRequestHandlers.BagUserStateHandlerFactory {
 
+/** Upper limit of the number of valid cache tokens to hand out to the 
SDK. */
+private static final int MAX_CACHE_SIZE = 100;
+
 private final StateInternals stateInternals;
 private final KeyedStateBackend keyedStateBackend;
 private final Lock stateBackendLock;
+/** Cache is scoped by state id. */
+private final Cache cacheTokens;
+
+private final IdGenerator cacheTokenGenerator;
 
-private BagUserStateFactory(
+BagUserStateFactory(
 StateInternals stateInternals,
 KeyedStateBackend keyedStateBackend,
 Lock stateBackendLock) {
 
   this.stateInternals = stateInternals;
   this.keyedStateBackend = keyedStateBackend;
   this.stateBackendLock = stateBackendLock;
+  this.cacheTokens = 
CacheBuilder.newBuilder().maximumSize(MAX_CACHE_SIZE).build();
+  this.cacheTokenGenerator = IdGenerators.incrementingLongs();
 
 Review comment:
   I am assuming this will be used later part of the implementation. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299184)
Time Spent: 50m  (was: 40m)

> Implement cross-bundle state caching.
> -
>
> Key: BEAM-5428
> URL: https://issues.apache.org/jira/browse/BEAM-5428
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Reporter: Robert Bradshaw
>Assignee: Rakesh Kumar
>Priority: Major
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> Tech spec: 
> [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m]
> Relevant document: 
> [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit]
> Mailing list link: 
> [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E]



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=299183=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299183
 ]

ASF GitHub Bot logged work on BEAM-5428:


Author: ASF GitHub Bot
Created on: 22/Aug/19 05:54
Start Date: 22/Aug/19 05:54
Worklog Time Spent: 10m 
  Work Description: rakeshcusat commented on pull request #9374: 
[BEAM-5428] Implement Runner support for cache tokens
URL: https://github.com/apache/beam/pull/9374#discussion_r316506303
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
 ##
 @@ -248,33 +253,45 @@ private StateRequestHandler 
getStateRequestHandler(ExecutableStage executableSta
 return StateRequestHandlers.delegateBasedUponType(handlerMap);
   }
 
-  private static class BagUserStateFactory
+  static class BagUserStateFactory
   implements StateRequestHandlers.BagUserStateHandlerFactory {
 
+/** Upper limit of the number of valid cache tokens to hand out to the 
SDK. */
+private static final int MAX_CACHE_SIZE = 100;
+
 private final StateInternals stateInternals;
 private final KeyedStateBackend keyedStateBackend;
 private final Lock stateBackendLock;
+/** Cache is scoped by state id. */
 
 Review comment:
   is it possible that there are two DoFns in the pipeline and they 
accidentally using the same state id? In this case there could be collision of 
stateid. I think we also need other parameters in the cache key.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299183)
Time Spent: 40m  (was: 0.5h)

> Implement cross-bundle state caching.
> -
>
> Key: BEAM-5428
> URL: https://issues.apache.org/jira/browse/BEAM-5428
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Reporter: Robert Bradshaw
>Assignee: Rakesh Kumar
>Priority: Major
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> Tech spec: 
> [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m]
> Relevant document: 
> [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit]
> Mailing list link: 
> [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E]



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=299181=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299181
 ]

ASF GitHub Bot logged work on BEAM-5428:


Author: ASF GitHub Bot
Created on: 22/Aug/19 05:52
Start Date: 22/Aug/19 05:52
Worklog Time Spent: 10m 
  Work Description: rakeshcusat commented on pull request #9374: 
[BEAM-5428] Implement Runner support for cache tokens
URL: https://github.com/apache/beam/pull/9374#discussion_r316505707
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
 ##
 @@ -248,33 +253,45 @@ private StateRequestHandler 
getStateRequestHandler(ExecutableStage executableSta
 return StateRequestHandlers.delegateBasedUponType(handlerMap);
   }
 
-  private static class BagUserStateFactory
+  static class BagUserStateFactory
   implements StateRequestHandlers.BagUserStateHandlerFactory {
 
+/** Upper limit of the number of valid cache tokens to hand out to the 
SDK. */
+private static final int MAX_CACHE_SIZE = 100;
 
 Review comment:
   I think it is good for the initial implementation. Do we want to make it 
configuration based later on?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299181)
Time Spent: 0.5h  (was: 20m)

> Implement cross-bundle state caching.
> -
>
> Key: BEAM-5428
> URL: https://issues.apache.org/jira/browse/BEAM-5428
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Reporter: Robert Bradshaw
>Assignee: Rakesh Kumar
>Priority: Major
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> Tech spec: 
> [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m]
> Relevant document: 
> [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit]
> Mailing list link: 
> [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E]



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (BEAM-7049) Merge multiple input to one BeamUnionRel

2019-08-21 Thread Rui Wang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-7049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16912980#comment-16912980
 ] 

Rui Wang commented on BEAM-7049:


Although usually most of the use case we might only need <5 UNIONs, it still 
helpful to have a general n implementation. 

Does this implementation help:
1. KeyedPCollectionTuple.of(), and then for (i -> 1, n) 
{KeyedPCollectionTuple.and(current i);}
2. SetOperatorFilteringDoFn(List so you pretty much 
will have a n loop to finish "union" operation.


> Merge multiple input to one BeamUnionRel
> 
>
> Key: BEAM-7049
> URL: https://issues.apache.org/jira/browse/BEAM-7049
> Project: Beam
>  Issue Type: Improvement
>  Components: dsl-sql
>Reporter: Rui Wang
>Assignee: sridhar Reddy
>Priority: Major
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> BeamUnionRel assumes inputs are two and rejects more. So `a UNION b UNION c` 
> will have to be created as UNION(a, UNION(b, c)) and have two shuffles. If 
> BeamUnionRel can handle multiple shuffles, we will have only one shuffle



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (BEAM-8031) Add Snippets for Patterns website

2019-08-21 Thread Reza ardeshir rokni (Jira)
Reza ardeshir rokni created BEAM-8031:
-

 Summary: Add Snippets for Patterns website
 Key: BEAM-8031
 URL: https://issues.apache.org/jira/browse/BEAM-8031
 Project: Beam
  Issue Type: Improvement
  Components: website
Reporter: Reza ardeshir rokni






--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-6855) Side inputs are not supported when using the state API

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-6855?focusedWorklogId=299146=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299146
 ]

ASF GitHub Bot logged work on BEAM-6855:


Author: ASF GitHub Bot
Created on: 22/Aug/19 04:11
Start Date: 22/Aug/19 04:11
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on issue #9140: [BEAM-6855] Side 
inputs are not supported when using the state API
URL: https://github.com/apache/beam/pull/9140#issuecomment-523739372
 
 
   I think part of the confusion here is that Dataflow appears to use a 
different .code path for this - StreamingSideInputDoFnRunner
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299146)
Time Spent: 3h 50m  (was: 3h 40m)

> Side inputs are not supported when using the state API
> --
>
> Key: BEAM-6855
> URL: https://issues.apache.org/jira/browse/BEAM-6855
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core, runner-dataflow, runner-direct
>Reporter: Reuven Lax
>Assignee: Shehzaad Nakhoda
>Priority: Major
>  Time Spent: 3h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7742) BigQuery File Loads to work well with load job size limits

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7742?focusedWorklogId=299143=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299143
 ]

ASF GitHub Bot logged work on BEAM-7742:


Author: ASF GitHub Bot
Created on: 22/Aug/19 04:05
Start Date: 22/Aug/19 04:05
Worklog Time Spent: 10m 
  Work Description: pabloem commented on issue #9242: [BEAM-7742] Partition 
files in BQFL to cater to quotas & limits
URL: https://github.com/apache/beam/pull/9242#issuecomment-523738404
 
 
   Run Python 3.5 PostCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299143)
Time Spent: 4h  (was: 3h 50m)

> BigQuery File Loads to work well with load job size limits
> --
>
> Key: BEAM-7742
> URL: https://issues.apache.org/jira/browse/BEAM-7742
> Project: Beam
>  Issue Type: Improvement
>  Components: io-py-gcp
>Reporter: Pablo Estrada
>Assignee: Tanay Tummalapalli
>Priority: Major
>  Time Spent: 4h
>  Remaining Estimate: 0h
>
> Load jobs into BigQuery have a number of limitations: 
> [https://cloud.google.com/bigquery/quotas#load_jobs]
>  
> Currently, the python BQ sink implemented in `bigquery_file_loads.py` does 
> not handle these limitations well. Improvements need to be made to the 
> miplementation, to:
>  * Decide to use temp_tables dynamically at pipeline execution
>  * Add code to determine when a load job to a single destination needs to be 
> partitioned into multiple jobs.
>  * When this happens, then we definitely need to use temp_tables, in case one 
> of the two load jobs fails, and the pipeline is rerun.
> Tanay, would you be able to look at this?



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Comment Edited] (BEAM-7049) Merge multiple input to one BeamUnionRel

2019-08-21 Thread sridhar Reddy (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-7049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16912882#comment-16912882
 ] 

sridhar Reddy edited comment on BEAM-7049 at 8/22/19 3:35 AM:
--

Just including UnionMergeRule.INSTANCE in BeamRuleSets makes union with 3 
operands works without any hardcoding. 4 Operands may need a new merge rule 
altogether. 

[~amaliujia]  are we limiting the number of operands to some number to a small 
number? or are we generalizing to number n ? 

If we are generalizing to a number "n" then one of the sticky points I am 
facing is creating  KeyedPCollectionTuple [1] which takes TupleTag as one of 
its parameters and I am not sure if there is a simple way to generate variables 
on the fly?  If we just create a new TupleTag object without assigning to a 
variable then the following call will be affected and some changes need to be 
cascaded down the line. 

[2]BeamSetOperatorsTransforms.SetOperatorFilteringDoFn()

What do you suggest?

 

1.[https://github.com/apache/beam/blob/3561100b30b64e4ac857afbf6e5016dfaf2ecc22/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java#L86]

2. 
[https://github.com/apache/beam/blob/3561100b30b64e4ac857afbf6e5016dfaf2ecc22/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java#L100]

 

 

 

 

 


was (Author: sridharg):
Just including UnionMergeRule.INSTANCE in BeamRuleSets makes union with 3 
operands works without any hardcoding. 4 Operands may need a new merge rule 
altogether. 

[~amaliujia]  are we limiting the number of operands to some number to a small 
number? or are we generalizing to number n ? 

If we are generalizing to a number "n" then one of the sticky points I am 
facing is creating  KeyedPCollectionTuple [1] which takes TupleTag as one of 
its parameters and I am not sure if there is a way to generate on the fly?  If 
we just create a new TupleTag object without assigning to a variable then the 
following call will be affected and some changes need to be cascaded down the 
line. 

[2]BeamSetOperatorsTransforms.SetOperatorFilteringDoFn()

What do you suggest?

 

1.[https://github.com/apache/beam/blob/3561100b30b64e4ac857afbf6e5016dfaf2ecc22/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java#L86]

2. 
[https://github.com/apache/beam/blob/3561100b30b64e4ac857afbf6e5016dfaf2ecc22/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java#L100]

 

 

 

 

 

> Merge multiple input to one BeamUnionRel
> 
>
> Key: BEAM-7049
> URL: https://issues.apache.org/jira/browse/BEAM-7049
> Project: Beam
>  Issue Type: Improvement
>  Components: dsl-sql
>Reporter: Rui Wang
>Assignee: sridhar Reddy
>Priority: Major
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> BeamUnionRel assumes inputs are two and rejects more. So `a UNION b UNION c` 
> will have to be created as UNION(a, UNION(b, c)) and have two shuffles. If 
> BeamUnionRel can handle multiple shuffles, we will have only one shuffle



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=299132=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299132
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 22/Aug/19 03:28
Start Date: 22/Aug/19 03:28
Worklog Time Spent: 10m 
  Work Description: tweise commented on pull request #9398: [BEAM-7980] 
Exactly once artifact retrieval for Python SDK worker pool
URL: https://github.com/apache/beam/pull/9398#discussion_r316483610
 
 

 ##
 File path: sdks/python/container/boot.go
 ##
 @@ -105,18 +107,57 @@ func main() {
 
// (2) Retrieve and install the staged packages.
 
-   dir := filepath.Join(*semiPersistDir, "staged")
+   func() {
 
-   files, err := artifact.Materialize(ctx, *artifactEndpoint, 
info.GetRetrievalToken(), dir)
-   if err != nil {
-   log.Fatalf("Failed to retrieve staged files: %v", err)
-   }
+installCompleteFile := filepath.Join(os.TempDir(), 
"beam.install.complete")
 
-   // TODO(herohde): the packages to install should be specified 
explicitly. It
-   // would also be possible to install the SDK in the Dockerfile.
-   if setupErr := installSetupPackages(files, dir); setupErr != nil {
-   log.Fatalf("Failed to install required packages: %v", setupErr)
-   }
+   // skip if install already complete
+   _, err = os.Stat(installCompleteFile)
+   if err == nil {
+   return
+   }
+
+   // lock to guard from concurrent artifact retrieval and 
installation,
+   // when called by child processes in a worker pool
+   lock, err := lockfile.New(filepath.Join(os.TempDir(), 
"beam.install.lck"))
+   if err != nil {
+   log.Fatalf("Cannot init artifact retrieval lock: %v", 
err)
+   }
+
+   for err = lock.TryLock(); err != nil; err = lock.TryLock() {
+   switch err {
+   case lockfile.ErrBusy, lockfile.ErrNotExist:
+   time.Sleep(5 * time.Second)
+   log.Printf("Worker %v waiting for artifact 
retrieval lock: %v", *id, lock)
+   default:
+   log.Fatalf("Worker %v could not obtain artifact 
retrieval lock: %v", *id, err)
+   }
+   }
+   defer lock.Unlock()
+
+   // skip if install already complete
+   _, err = os.Stat(installCompleteFile)
+   if err == nil {
+   return
+   }
+
+   dir := filepath.Join(*semiPersistDir, "staged")
+
+   files, err := artifact.Materialize(ctx, *artifactEndpoint, 
info.GetRetrievalToken(), dir)
+   if err != nil {
+   log.Fatalf("Failed to retrieve staged files: %v", err)
+   }
+
+   // TODO(herohde): the packages to install should be specified 
explicitly. It
+   // would also be possible to install the SDK in the Dockerfile.
+   if setupErr := installSetupPackages(files, dir); setupErr != 
nil {
 
 Review comment:
   In the future we could look into inverting this logic by extracting the 
exactly-once logic into a separate function that can be shared between the SDK 
containers.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299132)
Time Spent: 5.5h  (was: 5h 20m)

> External environment with containerized worker pool
> ---
>
> Key: BEAM-7980
> URL: https://issues.apache.org/jira/browse/BEAM-7980
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Time Spent: 5.5h
>  Remaining Estimate: 0h
>
> Augment Beam Python docker image and boot.go so that it can be used to launch 
> BeamFnExternalWorkerPoolServicer.
> [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0]
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=299134=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299134
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 22/Aug/19 03:28
Start Date: 22/Aug/19 03:28
Worklog Time Spent: 10m 
  Work Description: tweise commented on pull request #9398: [BEAM-7980] 
Exactly once artifact retrieval for Python SDK worker pool
URL: https://github.com/apache/beam/pull/9398#discussion_r316483610
 
 

 ##
 File path: sdks/python/container/boot.go
 ##
 @@ -105,18 +107,57 @@ func main() {
 
// (2) Retrieve and install the staged packages.
 
-   dir := filepath.Join(*semiPersistDir, "staged")
+   func() {
 
-   files, err := artifact.Materialize(ctx, *artifactEndpoint, 
info.GetRetrievalToken(), dir)
-   if err != nil {
-   log.Fatalf("Failed to retrieve staged files: %v", err)
-   }
+installCompleteFile := filepath.Join(os.TempDir(), 
"beam.install.complete")
 
-   // TODO(herohde): the packages to install should be specified 
explicitly. It
-   // would also be possible to install the SDK in the Dockerfile.
-   if setupErr := installSetupPackages(files, dir); setupErr != nil {
-   log.Fatalf("Failed to install required packages: %v", setupErr)
-   }
+   // skip if install already complete
+   _, err = os.Stat(installCompleteFile)
+   if err == nil {
+   return
+   }
+
+   // lock to guard from concurrent artifact retrieval and 
installation,
+   // when called by child processes in a worker pool
+   lock, err := lockfile.New(filepath.Join(os.TempDir(), 
"beam.install.lck"))
+   if err != nil {
+   log.Fatalf("Cannot init artifact retrieval lock: %v", 
err)
+   }
+
+   for err = lock.TryLock(); err != nil; err = lock.TryLock() {
+   switch err {
+   case lockfile.ErrBusy, lockfile.ErrNotExist:
+   time.Sleep(5 * time.Second)
+   log.Printf("Worker %v waiting for artifact 
retrieval lock: %v", *id, lock)
+   default:
+   log.Fatalf("Worker %v could not obtain artifact 
retrieval lock: %v", *id, err)
+   }
+   }
+   defer lock.Unlock()
+
+   // skip if install already complete
+   _, err = os.Stat(installCompleteFile)
+   if err == nil {
+   return
+   }
+
+   dir := filepath.Join(*semiPersistDir, "staged")
+
+   files, err := artifact.Materialize(ctx, *artifactEndpoint, 
info.GetRetrievalToken(), dir)
+   if err != nil {
+   log.Fatalf("Failed to retrieve staged files: %v", err)
+   }
+
+   // TODO(herohde): the packages to install should be specified 
explicitly. It
+   // would also be possible to install the SDK in the Dockerfile.
+   if setupErr := installSetupPackages(files, dir); setupErr != 
nil {
 
 Review comment:
   In the future we could look into inverting this logic by extracting the 
exactly-once part into a separate function that can be shared between the SDK 
containers.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299134)
Time Spent: 5h 40m  (was: 5.5h)

> External environment with containerized worker pool
> ---
>
> Key: BEAM-7980
> URL: https://issues.apache.org/jira/browse/BEAM-7980
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Time Spent: 5h 40m
>  Remaining Estimate: 0h
>
> Augment Beam Python docker image and boot.go so that it can be used to launch 
> BeamFnExternalWorkerPoolServicer.
> [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0]
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=299131=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299131
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 22/Aug/19 03:26
Start Date: 22/Aug/19 03:26
Worklog Time Spent: 10m 
  Work Description: tweise commented on pull request #9398: [BEAM-7980] 
Exactly once artifact retrieval for Python SDK worker pool
URL: https://github.com/apache/beam/pull/9398
 
 
   This is the follow-up for exactly once artifact retrieval. 
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build
 

[jira] [Work logged] (BEAM-7711) Support DATETIME as a logical type in BeamSQL

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7711?focusedWorklogId=299127=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299127
 ]

ASF GitHub Bot logged work on BEAM-7711:


Author: ASF GitHub Bot
Created on: 22/Aug/19 03:15
Start Date: 22/Aug/19 03:15
Worklog Time Spent: 10m 
  Work Description: kennknowles commented on pull request #8994: 
[BEAM-7711] Add DATETIME as a logical type in BeamSQL
URL: https://github.com/apache/beam/pull/8994#discussion_r316481841
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
 ##
 @@ -154,6 +163,7 @@ public static boolean isStringType(FieldType fieldType) {
   .put(TIME_WITH_LOCAL_TZ, SqlTypeName.TIME_WITH_LOCAL_TIME_ZONE)
   .put(TIMESTAMP, SqlTypeName.TIMESTAMP)
   .put(TIMESTAMP_WITH_LOCAL_TZ, 
SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE)
+  .put(DATETIME, SqlTypeName.TIMESTAMP)
 
 Review comment:
   I believe that `TIMESTAMP` is already without time zone. See 
https://issues.apache.org/jira/browse/CALCITE-2394 for some discussion I have 
had about some of this.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299127)
Time Spent: 3h  (was: 2h 50m)

> Support DATETIME as a logical type in BeamSQL
> -
>
> Key: BEAM-7711
> URL: https://issues.apache.org/jira/browse/BEAM-7711
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Rui Wang
>Priority: Major
>  Time Spent: 3h
>  Remaining Estimate: 0h
>
> DATETIME as a type represents a year, month, day, hour, minute, second, and 
> subsecond(millis)
> it ranges from 0001-01-01 00:00:00 to -12-31 23:59:59.999.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (BEAM-7049) Merge multiple input to one BeamUnionRel

2019-08-21 Thread sridhar Reddy (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-7049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16912882#comment-16912882
 ] 

sridhar Reddy commented on BEAM-7049:
-

Just including UnionMergeRule.INSTANCE in BeamRuleSets makes union with 3 
operands works without any hardcoding. 4 Operands may need a new merge rule 
altogether. 

[~amaliujia]  are we limiting the number of operands to some number to a small 
number? or are we generalizing to number n ? 

If we are generalizing to a number "n" then one of the sticky points I am 
facing is creating  KeyedPCollectionTuple [1] which takes TupleTag as one of 
its parameters and I am not sure if there is a way to generate on the fly?  If 
we just create a new TupleTag object without assigning to a variable then the 
following call will be affected and some changes need to be cascaded down the 
line. 

[2]BeamSetOperatorsTransforms.SetOperatorFilteringDoFn()

What do you suggest?

 

1.[https://github.com/apache/beam/blob/3561100b30b64e4ac857afbf6e5016dfaf2ecc22/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java#L86]

2. 
[https://github.com/apache/beam/blob/3561100b30b64e4ac857afbf6e5016dfaf2ecc22/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java#L100]

 

 

 

 

 

> Merge multiple input to one BeamUnionRel
> 
>
> Key: BEAM-7049
> URL: https://issues.apache.org/jira/browse/BEAM-7049
> Project: Beam
>  Issue Type: Improvement
>  Components: dsl-sql
>Reporter: Rui Wang
>Assignee: sridhar Reddy
>Priority: Major
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> BeamUnionRel assumes inputs are two and rejects more. So `a UNION b UNION c` 
> will have to be created as UNION(a, UNION(b, c)) and have two shuffles. If 
> BeamUnionRel can handle multiple shuffles, we will have only one shuffle



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7642) (Dataflow) Python AfterProcessingTime fires before defined time

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7642?focusedWorklogId=299106=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299106
 ]

ASF GitHub Bot logged work on BEAM-7642:


Author: ASF GitHub Bot
Created on: 22/Aug/19 01:30
Start Date: 22/Aug/19 01:30
Worklog Time Spent: 10m 
  Work Description: angoenka commented on pull request #9397: [BEAM-7642] 
Fix python sdk AfterProcessingTime unit discrepancy
URL: https://github.com/apache/beam/pull/9397#discussion_r316464552
 
 

 ##
 File path: sdks/python/apache_beam/transforms/trigger.py
 ##
 @@ -337,12 +337,12 @@ def from_runner_api(proto, context):
 proto.after_processing_time
 .timestamp_transforms[0]
 .delay
-.delay_millis))
+.delay_millis) // 1000)
 
 Review comment:
   We should change line 317 to
   `'', TimeDomain.REAL_TIME, context.get_current_time() + self.delay * 
1000)
   `
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299106)
Time Spent: 0.5h  (was: 20m)

> (Dataflow) Python AfterProcessingTime fires before defined time
> ---
>
> Key: BEAM-7642
> URL: https://issues.apache.org/jira/browse/BEAM-7642
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Mikhail Gryzykhin
>Assignee: Yichi Zhang
>Priority: Major
> Fix For: 2.16.0
>
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> [https://stackoverflow.com/questions/56700913/why-my-data-in-apache-beam-is-emitted-after-a-few-minutes-instead-of-10h]
>  
> User on StackOverflow has a problem that AfterProcessingTime on global window 
> fires before allocated time (10h). It dumps events within first 
> minutes/seconds and then drops the rest due to window closed.
> Code user provided seems valid. I tried to verify time units accepted by the 
> method, but I couldn't track it all the way through the code.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7642) (Dataflow) Python AfterProcessingTime fires before defined time

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7642?focusedWorklogId=299105=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299105
 ]

ASF GitHub Bot logged work on BEAM-7642:


Author: ASF GitHub Bot
Created on: 22/Aug/19 01:20
Start Date: 22/Aug/19 01:20
Worklog Time Spent: 10m 
  Work Description: y1chi commented on issue #9397: [BEAM-7642] Fix python 
sdk AfterProcessingTime unit discrepancy
URL: https://github.com/apache/beam/pull/9397#issuecomment-523707960
 
 
   R: @angoenka @aaltay 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299105)
Time Spent: 20m  (was: 10m)

> (Dataflow) Python AfterProcessingTime fires before defined time
> ---
>
> Key: BEAM-7642
> URL: https://issues.apache.org/jira/browse/BEAM-7642
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Mikhail Gryzykhin
>Assignee: Yichi Zhang
>Priority: Major
> Fix For: 2.16.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> [https://stackoverflow.com/questions/56700913/why-my-data-in-apache-beam-is-emitted-after-a-few-minutes-instead-of-10h]
>  
> User on StackOverflow has a problem that AfterProcessingTime on global window 
> fires before allocated time (10h). It dumps events within first 
> minutes/seconds and then drops the rest due to window closed.
> Code user provided seems valid. I tried to verify time units accepted by the 
> method, but I couldn't track it all the way through the code.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Assigned] (BEAM-7642) (Dataflow) Python AfterProcessingTime fires before defined time

2019-08-21 Thread Yichi Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yichi Zhang reassigned BEAM-7642:
-

Assignee: Yichi Zhang

> (Dataflow) Python AfterProcessingTime fires before defined time
> ---
>
> Key: BEAM-7642
> URL: https://issues.apache.org/jira/browse/BEAM-7642
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Mikhail Gryzykhin
>Assignee: Yichi Zhang
>Priority: Major
> Fix For: 2.16.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> [https://stackoverflow.com/questions/56700913/why-my-data-in-apache-beam-is-emitted-after-a-few-minutes-instead-of-10h]
>  
> User on StackOverflow has a problem that AfterProcessingTime on global window 
> fires before allocated time (10h). It dumps events within first 
> minutes/seconds and then drops the rest due to window closed.
> Code user provided seems valid. I tried to verify time units accepted by the 
> method, but I couldn't track it all the way through the code.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work started] (BEAM-7642) (Dataflow) Python AfterProcessingTime fires before defined time

2019-08-21 Thread Yichi Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on BEAM-7642 started by Yichi Zhang.
-
> (Dataflow) Python AfterProcessingTime fires before defined time
> ---
>
> Key: BEAM-7642
> URL: https://issues.apache.org/jira/browse/BEAM-7642
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Mikhail Gryzykhin
>Assignee: Yichi Zhang
>Priority: Major
> Fix For: 2.16.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> [https://stackoverflow.com/questions/56700913/why-my-data-in-apache-beam-is-emitted-after-a-few-minutes-instead-of-10h]
>  
> User on StackOverflow has a problem that AfterProcessingTime on global window 
> fires before allocated time (10h). It dumps events within first 
> minutes/seconds and then drops the rest due to window closed.
> Code user provided seems valid. I tried to verify time units accepted by the 
> method, but I couldn't track it all the way through the code.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7642) (Dataflow) Python AfterProcessingTime fires before defined time

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7642?focusedWorklogId=299104=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299104
 ]

ASF GitHub Bot logged work on BEAM-7642:


Author: ASF GitHub Bot
Created on: 22/Aug/19 01:17
Start Date: 22/Aug/19 01:17
Worklog Time Spent: 10m 
  Work Description: y1chi commented on pull request #9397: [BEAM-7642] Fix 
python sdk AfterProcessingTime unit discrepancy
URL: https://github.com/apache/beam/pull/9397
 
 
   It seems to be a bug in runner api AfterProcessingTime trigger translation. 
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build
 

[jira] [Work logged] (BEAM-7886) Make row coder a standard coder and implement in python

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7886?focusedWorklogId=299101=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299101
 ]

ASF GitHub Bot logged work on BEAM-7886:


Author: ASF GitHub Bot
Created on: 22/Aug/19 01:01
Start Date: 22/Aug/19 01:01
Worklog Time Spent: 10m 
  Work Description: aaltay commented on pull request #9188: [BEAM-7886] 
Make row coder a standard coder and implement in Python
URL: https://github.com/apache/beam/pull/9188#discussion_r316460412
 
 

 ##
 File path: sdks/python/setup.py
 ##
 @@ -115,8 +115,7 @@ def get_version():
 'mock>=1.0.1,<3.0.0',
 'pymongo>=3.8.0,<4.0.0',
 'oauth2client>=2.0.1,<4',
-# grpcio 1.8.1 and above requires protobuf 3.5.0.post1.
-'protobuf>=3.5.0.post1,<4',
+'protobuf>=3.8.0.post1,<4',
 
 Review comment:
   Re: numpy
   
   That is ok. It needs to be an explicit dependency at this point really.
   
   you can remove numpy dependency under test packages and please try to keep 
the same range.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299101)
Time Spent: 7h 50m  (was: 7h 40m)

> Make row coder a standard coder and implement in python
> ---
>
> Key: BEAM-7886
> URL: https://issues.apache.org/jira/browse/BEAM-7886
> Project: Beam
>  Issue Type: Improvement
>  Components: beam-model, sdk-java-core, sdk-py-core
>Reporter: Brian Hulette
>Assignee: Brian Hulette
>Priority: Major
>  Time Spent: 7h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7886) Make row coder a standard coder and implement in python

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7886?focusedWorklogId=299093=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299093
 ]

ASF GitHub Bot logged work on BEAM-7886:


Author: ASF GitHub Bot
Created on: 22/Aug/19 00:15
Start Date: 22/Aug/19 00:15
Worklog Time Spent: 10m 
  Work Description: TheNeuralBit commented on pull request #9188: 
[BEAM-7886] Make row coder a standard coder and implement in Python
URL: https://github.com/apache/beam/pull/9188#discussion_r316452464
 
 

 ##
 File path: sdks/python/setup.py
 ##
 @@ -115,8 +115,7 @@ def get_version():
 'mock>=1.0.1,<3.0.0',
 'pymongo>=3.8.0,<4.0.0',
 'oauth2client>=2.0.1,<4',
-# grpcio 1.8.1 and above requires protobuf 3.5.0.post1.
-'protobuf>=3.5.0.post1,<4',
+'protobuf>=3.8.0.post1,<4',
 
 Review comment:
   :+1: done
   
   I also added a dependency on numpy. It shouldn't change anything since we 
already had the dependency transitively through pyarrow (and perhaps others?)
   
   Is that ok?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299093)
Time Spent: 7h 40m  (was: 7.5h)

> Make row coder a standard coder and implement in python
> ---
>
> Key: BEAM-7886
> URL: https://issues.apache.org/jira/browse/BEAM-7886
> Project: Beam
>  Issue Type: Improvement
>  Components: beam-model, sdk-java-core, sdk-py-core
>Reporter: Brian Hulette
>Assignee: Brian Hulette
>Priority: Major
>  Time Spent: 7h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (BEAM-8030) Make VarIntCoder overflow behavior consistent

2019-08-21 Thread Brian Hulette (Jira)
Brian Hulette created BEAM-8030:
---

 Summary: Make VarIntCoder overflow behavior consistent
 Key: BEAM-8030
 URL: https://issues.apache.org/jira/browse/BEAM-8030
 Project: Beam
  Issue Type: Improvement
  Components: sdk-py-core
Reporter: Brian Hulette
Assignee: Brian Hulette


The fast version of OutputStream.write_var_int_64 (and thus VarIntCoder.encode) 
throws OverflowError for ints larger than 64 bits, but the slow version does 
not. We should make them both throw an error.

We may also want to add a write_var_int_32 that uses the same format, but will 
throw an error for ints larger than 32 bits, for use in RowCoder.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7886) Make row coder a standard coder and implement in python

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7886?focusedWorklogId=299088=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299088
 ]

ASF GitHub Bot logged work on BEAM-7886:


Author: ASF GitHub Bot
Created on: 22/Aug/19 00:05
Start Date: 22/Aug/19 00:05
Worklog Time Spent: 10m 
  Work Description: TheNeuralBit commented on pull request #9188: 
[BEAM-7886] Make row coder a standard coder and implement in Python
URL: https://github.com/apache/beam/pull/9188#discussion_r316450487
 
 

 ##
 File path: sdks/python/apache_beam/coders/row_coder.py
 ##
 @@ -0,0 +1,162 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+
+import itertools
+from array import array
+
+from apache_beam.coders.coder_impl import StreamCoderImpl
+from apache_beam.coders.coders import BytesCoder
+from apache_beam.coders.coders import Coder
+from apache_beam.coders.coders import FastCoder
+from apache_beam.coders.coders import FloatCoder
+from apache_beam.coders.coders import IterableCoder
+from apache_beam.coders.coders import StrUtf8Coder
+from apache_beam.coders.coders import TupleCoder
+from apache_beam.coders.coders import VarIntCoder
+from apache_beam.portability import common_urns
+from apache_beam.portability.api import schema_pb2
+from apache_beam.typehints.schemas import named_tuple_from_schema
+from apache_beam.typehints.schemas import named_tuple_to_schema
+
+__all__ = ["RowCoder"]
+
+
+class RowCoder(FastCoder):
+  """ Coder for `typing.NamedTuple` instances.
+
+  Implements the beam:coder:row:v1 standard coder spec.
+  """
+
+  def __init__(self, schema):
+self.schema = schema
+self.components = [
+coder_from_type(field.type) for field in self.schema.fields
+]
+
+  def _create_impl(self):
+return RowCoderImpl(self.schema, self.components)
+
+  def is_deterministic(self):
+return all(c.is_deterministic() for c in self.components)
+
+  def to_type_hint(self):
+return named_tuple_from_schema(self.schema)
+
+  def as_cloud_object(self, coders_context=None):
+raise NotImplementedError("TODO")
+
+  def __eq__(self, other):
+return type(self) == type(other) and self.schema == other.schema
+
+  def __hash__(self):
+return hash((type(self), self.schema.SerializePartialToString()))
+
+  def to_runner_api_parameter(self, unused_context):
+return (common_urns.coders.ROW.urn, self.schema, [])
+
+  @staticmethod
+  def from_type_hint(named_tuple_type, registry):
+return RowCoder(named_tuple_to_schema(named_tuple_type))
+
+
+def coder_from_type(type_):
+  type_info = type_.WhichOneof("type_info")
+  if type_info == "atomic_type":
+if type_.atomic_type in (schema_pb2.AtomicType.INT32,
+ schema_pb2.AtomicType.INT64):
+  return VarIntCoder()
+elif type_.atomic_type == schema_pb2.AtomicType.DOUBLE:
+  return FloatCoder()
+elif type_.atomic_type == schema_pb2.AtomicType.STRING:
+  return StrUtf8Coder()
+  elif type_info == "array_type":
+return IterableCoder(coder_from_type(type_.array_type.element_type))
+
+  # The Java SDK supports several more types, but the coders are not yet
+  # standard, and are not implemented in Python.
+  raise ValueError(
+  "Encountered a type that is not currently supported by RowCoder: %s" %
+  type_)
+
+
+# pylint: disable=unused-variable
+
+@Coder.register_urn(common_urns.coders.ROW.urn, schema_pb2.Schema)
+def from_runner_api_parameter(payload, components, unused_context):
 
 Review comment:
   Done
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299088)
Time Spent: 7.5h  (was: 7h 20m)

> Make row coder a standard coder and implement in python
> ---
>
> Key: BEAM-7886
>  

[jira] [Work logged] (BEAM-7886) Make row coder a standard coder and implement in python

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7886?focusedWorklogId=299087=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299087
 ]

ASF GitHub Bot logged work on BEAM-7886:


Author: ASF GitHub Bot
Created on: 22/Aug/19 00:04
Start Date: 22/Aug/19 00:04
Worklog Time Spent: 10m 
  Work Description: TheNeuralBit commented on pull request #9188: 
[BEAM-7886] Make row coder a standard coder and implement in Python
URL: https://github.com/apache/beam/pull/9188#discussion_r316450339
 
 

 ##
 File path: sdks/python/apache_beam/typehints/schemas.py
 ##
 @@ -0,0 +1,217 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+""" Support for mapping python types to proto Schemas and back again.
+
+Python  Schema
+np.int8 <-> BYTE
+np.int16<-> INT16
+np.int32<-> INT32
+np.int64<-> INT64
+int ---/
+np.float32  <-> FLOAT
+np.float64  <-> DOUBLE
+float   ---/
+bool<-> BOOLEAN
+
+The mappings for STRING and BYTES are different between python 2 and python 3,
+because of the changes to str:
+py3:
+str/unicode <-> STRING
+bytes   <-> BYTES
+ByteString  ---/
+
+py2:
+unicode <-> STRING
+str/bytes   ---/
+ByteString  <-> BYTES
+"""
+
+from __future__ import absolute_import
+
+import sys
+from typing import ByteString
+from typing import Mapping
+from typing import NamedTuple
+from typing import Optional
+from typing import Sequence
+from uuid import uuid4
+
+import numpy as np
+from past.builtins import unicode
+
+from apache_beam.portability.api import schema_pb2
+from apache_beam.typehints.native_type_compatibility import _get_args
+from apache_beam.typehints.native_type_compatibility import 
_match_is_exactly_mapping
+from apache_beam.typehints.native_type_compatibility import 
_match_is_named_tuple
+from apache_beam.typehints.native_type_compatibility import _match_is_optional
+from apache_beam.typehints.native_type_compatibility import _safe_issubclass
+from apache_beam.typehints.native_type_compatibility import 
extract_optional_type
+
+
+# Registry of typings for a schema by UUID
+class SchemaTypeRegistry(object):
+  def __init__(self):
+self.by_id = {}
+self.by_typing = {}
+
+  def add(self, typing, schema):
+self.by_id[schema.id] = (typing, schema)
+
+  def get_typing_by_id(self, unique_id):
+result = self.by_id.get(unique_id, None)
+return result[0] if result is not None else None
+
+  def get_schema_by_id(self, unique_id):
+result = self.by_id.get(unique_id, None)
+return result[1] if result is not None else None
+
+
+SCHEMA_REGISTRY = SchemaTypeRegistry()
+
+
+# Bi-directional mappings
+_PRIMITIVES = (
+(np.int8, schema_pb2.AtomicType.BYTE),
+(np.int16, schema_pb2.AtomicType.INT16),
+(np.int32, schema_pb2.AtomicType.INT32),
+(np.int64, schema_pb2.AtomicType.INT64),
+(np.float32, schema_pb2.AtomicType.FLOAT),
+(np.float64, schema_pb2.AtomicType.DOUBLE),
+(unicode, schema_pb2.AtomicType.STRING),
+(bool, schema_pb2.AtomicType.BOOLEAN),
+(bytes if sys.version_info.major >= 3 else ByteString,
+ schema_pb2.AtomicType.BYTES),
+)
+
+PRIMITIVE_TO_ATOMIC_TYPE = dict((typ, atomic) for typ, atomic in _PRIMITIVES)
+ATOMIC_TYPE_TO_PRIMITIVE = dict((atomic, typ) for typ, atomic in _PRIMITIVES)
+
+# One-way mappings
+PRIMITIVE_TO_ATOMIC_TYPE.update({
+# In python 3, this is a no-op because str == unicode,
+# but in python 2 it overrides the bytes -> BYTES mapping.
+str: schema_pb2.AtomicType.STRING,
+# In python 2, this is a no-op because we define it as the bi-directional
+# mapping above. This just ensures the one-way mapping is defined in python
+# 3.
+ByteString: schema_pb2.AtomicType.BYTES,
+# Allow users to specify a native int, and use INT64 as the cross-language
+# representation. Technically ints have unlimited precision, but RowCoder
+# should throw an error if it sees one with a bit width > 64 when encoding.
+int: schema_pb2.AtomicType.INT64,
+float: schema_pb2.AtomicType.DOUBLE,
+})
+
+
+def typing_to_runner_api(type_):
+  if 

[jira] [Work logged] (BEAM-7886) Make row coder a standard coder and implement in python

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7886?focusedWorklogId=299085=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299085
 ]

ASF GitHub Bot logged work on BEAM-7886:


Author: ASF GitHub Bot
Created on: 22/Aug/19 00:03
Start Date: 22/Aug/19 00:03
Worklog Time Spent: 10m 
  Work Description: TheNeuralBit commented on pull request #9188: 
[BEAM-7886] Make row coder a standard coder and implement in Python
URL: https://github.com/apache/beam/pull/9188#discussion_r316450190
 
 

 ##
 File path: sdks/python/apache_beam/coders/row_coder.py
 ##
 @@ -0,0 +1,162 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+
+import itertools
+from array import array
+
+from apache_beam.coders.coder_impl import StreamCoderImpl
+from apache_beam.coders.coders import BytesCoder
+from apache_beam.coders.coders import Coder
+from apache_beam.coders.coders import FastCoder
+from apache_beam.coders.coders import FloatCoder
+from apache_beam.coders.coders import IterableCoder
+from apache_beam.coders.coders import StrUtf8Coder
+from apache_beam.coders.coders import TupleCoder
+from apache_beam.coders.coders import VarIntCoder
+from apache_beam.portability import common_urns
+from apache_beam.portability.api import schema_pb2
+from apache_beam.typehints.schemas import named_tuple_from_schema
+from apache_beam.typehints.schemas import named_tuple_to_schema
+
+__all__ = ["RowCoder"]
+
+
+class RowCoder(FastCoder):
+  """ Coder for `typing.NamedTuple` instances.
+
+  Implements the beam:coder:row:v1 standard coder spec.
+  """
+
+  def __init__(self, schema):
+self.schema = schema
+self.components = [
+coder_from_type(field.type) for field in self.schema.fields
+]
+
+  def _create_impl(self):
+return RowCoderImpl(self.schema, self.components)
+
+  def is_deterministic(self):
+return all(c.is_deterministic() for c in self.components)
+
+  def to_type_hint(self):
+return named_tuple_from_schema(self.schema)
+
+  def as_cloud_object(self, coders_context=None):
+raise NotImplementedError("TODO")
+
+  def __eq__(self, other):
+return type(self) == type(other) and self.schema == other.schema
+
+  def __hash__(self):
+return hash((type(self), self.schema.SerializePartialToString()))
+
+  def to_runner_api_parameter(self, unused_context):
+return (common_urns.coders.ROW.urn, self.schema, [])
+
+  @staticmethod
+  def from_type_hint(named_tuple_type, registry):
+return RowCoder(named_tuple_to_schema(named_tuple_type))
+
+
+def coder_from_type(type_):
+  type_info = type_.WhichOneof("type_info")
+  if type_info == "atomic_type":
+if type_.atomic_type in (schema_pb2.AtomicType.INT32,
+ schema_pb2.AtomicType.INT64):
+  return VarIntCoder()
+elif type_.atomic_type == schema_pb2.AtomicType.DOUBLE:
+  return FloatCoder()
+elif type_.atomic_type == schema_pb2.AtomicType.STRING:
+  return StrUtf8Coder()
+  elif type_info == "array_type":
+return IterableCoder(coder_from_type(type_.array_type.element_type))
+
+  # The Java SDK supports several more types, but the coders are not yet
+  # standard, and are not implemented in Python.
+  raise ValueError(
+  "Encountered a type that is not currently supported by RowCoder: %s" %
+  type_)
+
+
+# pylint: disable=unused-variable
 
 Review comment:
   Whoops - this was a copy-paste error that I think was getting obcured by my 
code folding? Regardless it's fixed now, thanks
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299085)
Time Spent: 7h 10m  (was: 7h)

> Make row coder a standard coder and implement in python
> ---
>
> Key: BEAM-7886
> URL: 

[jira] [Work logged] (BEAM-7013) A new count distinct transform based on BigQuery compatible HyperLogLog++ implementation

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7013?focusedWorklogId=299083=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299083
 ]

ASF GitHub Bot logged work on BEAM-7013:


Author: ASF GitHub Bot
Created on: 21/Aug/19 23:57
Start Date: 21/Aug/19 23:57
Worklog Time Spent: 10m 
  Work Description: boyuanzz commented on pull request #9144: [BEAM-7013] 
Integrating ZetaSketch's HLL++ algorithm with Beam
URL: https://github.com/apache/beam/pull/9144#discussion_r316449066
 
 

 ##
 File path: 
sdks/java/extensions/zetasketch/src/test/java/org/apache/beam/sdk/extensions/zetasketch/BigQueryHllSketchCompatibilityIT.java
 ##
 @@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.zetasketch;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
+import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
+import org.apache.beam.sdk.io.gcp.testing.BigqueryMatcher;
+import org.apache.beam.sdk.options.ApplicationNameOptions;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Integration tests for HLL++ sketch compatibility between Beam and BigQuery. 
The tests verifies
+ * that HLL++ sketches created in Beam can be processed by BigQuery, and vice 
versa.
+ */
+@RunWith(JUnit4.class)
+public class BigQueryHllSketchCompatibilityIT {
+
+  private static final String DATASET_NAME = "zetasketch_compatibility_test";
+
+  // Table for testReadSketchFromBigQuery()
+  // Schema: only one STRING field named "data".
+  // Content: prepopulated with 4 rows: "Apple", "Orange", "Banana", "Orange"
+  private static final String DATA_TABLE_NAME = "hll_data";
+  private static final String DATA_FIELD_NAME = "data";
+  private static final String QUERY_RESULT_FIELD_NAME = "sketch";
+  private static final Long EXPECTED_COUNT = 3L;
+
+  // Table for testWriteSketchToBigQuery()
+  // Schema: only one BYTES field named "sketch".
+  // Content: will be overridden by the sketch computed by the test pipeline 
each time the test runs
+  private static final String SKETCH_TABLE_NAME = "hll_sketch";
+  private static final String SKETCH_FIELD_NAME = "sketch";
+  private static final List TEST_DATA =
+  Arrays.asList("Apple", "Orange", "Banana", "Orange");
+  // SHA-1 hash of string "[3]", the string representation of a row that has 
only one field 3 in it
+  private static final String EXPECTED_CHECKSUM = 
"f1e31df9806ce94c5bdbbfff9608324930f4d3f1";
+
+  /**
+   * Test that HLL++ sketch computed in BigQuery can be processed by Beam. Hll 
sketch is computed by
+   * {@code HLL_COUNT.INIT} in BigQuery and read into Beam; the test verifies 
that we can run {@link
+   * HllCount.MergePartial} and {@link HllCount.Extract} on the sketch in Beam 
to get the correct
+   * estimated count.
+   */
+  @Test
+  public void testReadSketchFromBigQuery() {
 
 Review comment:
   
https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/BigqueryClient.java
 is the util class that could help BQ ITs to create/cleanup test data.
 

This is an automated message from the Apache Git Service.
To respond to the message, 

[jira] [Work logged] (BEAM-7013) A new count distinct transform based on BigQuery compatible HyperLogLog++ implementation

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7013?focusedWorklogId=299082=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299082
 ]

ASF GitHub Bot logged work on BEAM-7013:


Author: ASF GitHub Bot
Created on: 21/Aug/19 23:52
Start Date: 21/Aug/19 23:52
Worklog Time Spent: 10m 
  Work Description: robinyqiu commented on pull request #9144: [BEAM-7013] 
Integrating ZetaSketch's HLL++ algorithm with Beam
URL: https://github.com/apache/beam/pull/9144#discussion_r316447983
 
 

 ##
 File path: 
sdks/java/extensions/zetasketch/src/test/java/org/apache/beam/sdk/extensions/zetasketch/BigQueryHllSketchCompatibilityIT.java
 ##
 @@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.zetasketch;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
+import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
+import org.apache.beam.sdk.io.gcp.testing.BigqueryMatcher;
+import org.apache.beam.sdk.options.ApplicationNameOptions;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Integration tests for HLL++ sketch compatibility between Beam and BigQuery. 
The tests verifies
+ * that HLL++ sketches created in Beam can be processed by BigQuery, and vice 
versa.
+ */
+@RunWith(JUnit4.class)
+public class BigQueryHllSketchCompatibilityIT {
+
+  private static final String DATASET_NAME = "zetasketch_compatibility_test";
+
+  // Table for testReadSketchFromBigQuery()
+  // Schema: only one STRING field named "data".
+  // Content: prepopulated with 4 rows: "Apple", "Orange", "Banana", "Orange"
+  private static final String DATA_TABLE_NAME = "hll_data";
+  private static final String DATA_FIELD_NAME = "data";
+  private static final String QUERY_RESULT_FIELD_NAME = "sketch";
+  private static final Long EXPECTED_COUNT = 3L;
+
+  // Table for testWriteSketchToBigQuery()
+  // Schema: only one BYTES field named "sketch".
+  // Content: will be overridden by the sketch computed by the test pipeline 
each time the test runs
+  private static final String SKETCH_TABLE_NAME = "hll_sketch";
+  private static final String SKETCH_FIELD_NAME = "sketch";
+  private static final List TEST_DATA =
+  Arrays.asList("Apple", "Orange", "Banana", "Orange");
+  // SHA-1 hash of string "[3]", the string representation of a row that has 
only one field 3 in it
+  private static final String EXPECTED_CHECKSUM = 
"f1e31df9806ce94c5bdbbfff9608324930f4d3f1";
+
+  /**
+   * Test that HLL++ sketch computed in BigQuery can be processed by Beam. Hll 
sketch is computed by
+   * {@code HLL_COUNT.INIT} in BigQuery and read into Beam; the test verifies 
that we can run {@link
+   * HllCount.MergePartial} and {@link HllCount.Extract} on the sketch in Beam 
to get the correct
+   * estimated count.
+   */
+  @Test
+  public void testReadSketchFromBigQuery() {
 
 Review comment:
   This is a very good point. I agree with you that ideally a test should be 
self-contained and not depend on any external resources. The reason why I did 
that is simply because the other BigQueryIO integration tests under this 

[jira] [Work logged] (BEAM-6858) Support side inputs injected into a DoFn

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-6858?focusedWorklogId=299075=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299075
 ]

ASF GitHub Bot logged work on BEAM-6858:


Author: ASF GitHub Bot
Created on: 21/Aug/19 23:43
Start Date: 21/Aug/19 23:43
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on pull request #9275: [BEAM-6858] 
Support side inputs injected into a DoFn
URL: https://github.com/apache/beam/pull/9275#discussion_r316446239
 
 

 ##
 File path: 
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java
 ##
 @@ -177,10 +177,7 @@ public String getUrn(ParDoSingle transform) {
   // TODO: Is there a better way to do this?
   Set allInputs =
   
transform.getInputs().keySet().stream().map(TupleTag::getId).collect(Collectors.toSet());
-  Set sideInputs =
-  parDo.getSideInputs().stream()
-  .map(s -> s.getTagInternal().getId())
-  .collect(Collectors.toSet());
+  Set sideInputs = parDo.getSideInputs().keySet();
 
 Review comment:
   this is what caused the DataflowRunner failure before.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299075)
Time Spent: 5h 10m  (was: 5h)

> Support side inputs injected into a DoFn
> 
>
> Key: BEAM-6858
> URL: https://issues.apache.org/jira/browse/BEAM-6858
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Reporter: Reuven Lax
>Assignee: Shehzaad Nakhoda
>Priority: Major
>  Time Spent: 5h 10m
>  Remaining Estimate: 0h
>
> Beam currently supports injecting main inputs into a DoFn process method. A 
> user can write the following:
> @ProcessElement public void process(@Element InputT element)
> And Beam will (using ByteBuddy code generation) inject the input element into 
> the process method.
> We would like to also support the same for side inputs. For example:
> @ProcessElement public void process(@Element InputT element, 
> @SideInput("tag1") String input1, @SideInput("tag2") Integer input2) 
> This requires the existing process-method analysis framework to capture these 
> side inputs. The ParDo code would have to verify the type of the side input 
> and include them in the list of side inputs. This would also eliminate the 
> need for the user to explicitly call withSideInputs on the ParDo.
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7013) A new count distinct transform based on BigQuery compatible HyperLogLog++ implementation

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7013?focusedWorklogId=299074=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299074
 ]

ASF GitHub Bot logged work on BEAM-7013:


Author: ASF GitHub Bot
Created on: 21/Aug/19 23:43
Start Date: 21/Aug/19 23:43
Worklog Time Spent: 10m 
  Work Description: robinyqiu commented on pull request #9144: [BEAM-7013] 
Integrating ZetaSketch's HLL++ algorithm with Beam
URL: https://github.com/apache/beam/pull/9144#discussion_r316446092
 
 

 ##
 File path: sdks/java/extensions/zetasketch/build.gradle
 ##
 @@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import groovy.json.JsonOutput
+
+plugins { id 'org.apache.beam.module' }
+applyJavaNature()
+
+description = "Apache Beam :: SDKs :: Java :: Extensions :: ZetaSketch"
+
+def zetasketch_version = "0.1.0"
+
+dependencies {
+compile library.java.vendored_guava_26_0_jre
+compile project(path: ":sdks:java:core", configuration: "shadow")
+compile "com.google.zetasketch:zetasketch:$zetasketch_version"
+testCompile library.java.junit
+testCompile project(":sdks:java:io:google-cloud-platform")
+testRuntimeOnly project(":runners:direct-java")
+testRuntimeOnly project(":runners:google-cloud-dataflow-java")
+}
+
+/**
+ * Integration tests running on Dataflow with BigQuery.
+ */
+task integrationTest(type: Test) {
+group = "Verification"
+def gcpProject = project.findProperty('gcpProject') ?: 
'apache-beam-testing'
+def gcpTempRoot = project.findProperty('gcpTempRoot') ?: 
'gs://temp-storage-for-end-to-end-tests'
+systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
+"--runner=TestDataflowRunner",
+"--project=${gcpProject}",
+"--tempRoot=${gcpTempRoot}",
+])
 
 Review comment:
   Thanks for the pointer. Will do.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299074)
Time Spent: 26h  (was: 25h 50m)

> A new count distinct transform based on BigQuery compatible HyperLogLog++ 
> implementation
> 
>
> Key: BEAM-7013
> URL: https://issues.apache.org/jira/browse/BEAM-7013
> Project: Beam
>  Issue Type: New Feature
>  Components: extensions-java-sketching, sdk-java-core
>Reporter: Yueyang Qiu
>Assignee: Yueyang Qiu
>Priority: Major
> Fix For: 2.16.0
>
>  Time Spent: 26h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Resolved] (BEAM-7834) warn user when --zone flag set

2019-08-21 Thread Kyle Weaver (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7834?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kyle Weaver resolved BEAM-7834.
---
Fix Version/s: Not applicable
   Resolution: Won't Fix

> warn user when --zone flag set
> --
>
> Key: BEAM-7834
> URL: https://issues.apache.org/jira/browse/BEAM-7834
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-dataflow
>Reporter: Kyle Weaver
>Assignee: Kyle Weaver
>Priority: Major
> Fix For: Not applicable
>
>
> We now warn when region is unset. There are cases when the user has a 
> legitimate need to set zone, so I'm marking this as won't fix.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Updated] (BEAM-7834) warn user when --zone flag set

2019-08-21 Thread Kyle Weaver (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7834?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kyle Weaver updated BEAM-7834:
--
Description: We now warn when region is unset. There are cases when the 
user has a legitimate need to set zone, so I'm marking this as won't fix.

> warn user when --zone flag set
> --
>
> Key: BEAM-7834
> URL: https://issues.apache.org/jira/browse/BEAM-7834
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-dataflow
>Reporter: Kyle Weaver
>Assignee: Kyle Weaver
>Priority: Major
>
> We now warn when region is unset. There are cases when the user has a 
> legitimate need to set zone, so I'm marking this as won't fix.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7013) A new count distinct transform based on BigQuery compatible HyperLogLog++ implementation

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7013?focusedWorklogId=299073=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299073
 ]

ASF GitHub Bot logged work on BEAM-7013:


Author: ASF GitHub Bot
Created on: 21/Aug/19 23:33
Start Date: 21/Aug/19 23:33
Worklog Time Spent: 10m 
  Work Description: robinyqiu commented on pull request #9144: [BEAM-7013] 
Integrating ZetaSketch's HLL++ algorithm with Beam
URL: https://github.com/apache/beam/pull/9144#discussion_r316444192
 
 

 ##
 File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/BigqueryMatcher.java
 ##
 @@ -63,32 +63,52 @@
 
   private final String projectId;
   private final String query;
+  private final boolean usingStandardSql;
 
 Review comment:
   I have considered this. But in our Beam `BigQueryIO` source we also have
   
https://github.com/apache/beam/blob/08d0146791e38be4641ff80ffb2539cdc81f5b6d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L600
   
   Since this is a public API visible to Beam users but the BQ client is not, I 
decided to be consistent with the former (and therefore I have to negate the 
boolean somewhere in the function call stack).
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299073)
Time Spent: 25h 50m  (was: 25h 40m)

> A new count distinct transform based on BigQuery compatible HyperLogLog++ 
> implementation
> 
>
> Key: BEAM-7013
> URL: https://issues.apache.org/jira/browse/BEAM-7013
> Project: Beam
>  Issue Type: New Feature
>  Components: extensions-java-sketching, sdk-java-core
>Reporter: Yueyang Qiu
>Assignee: Yueyang Qiu
>Priority: Major
> Fix For: 2.16.0
>
>  Time Spent: 25h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7711) Support DATETIME as a logical type in BeamSQL

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7711?focusedWorklogId=299071=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299071
 ]

ASF GitHub Bot logged work on BEAM-7711:


Author: ASF GitHub Bot
Created on: 21/Aug/19 23:32
Start Date: 21/Aug/19 23:32
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on pull request #8994: [BEAM-7711] 
Add DATETIME as a logical type in BeamSQL
URL: https://github.com/apache/beam/pull/8994#discussion_r316443921
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
 ##
 @@ -154,6 +163,7 @@ public static boolean isStringType(FieldType fieldType) {
   .put(TIME_WITH_LOCAL_TZ, SqlTypeName.TIME_WITH_LOCAL_TIME_ZONE)
   .put(TIMESTAMP, SqlTypeName.TIMESTAMP)
   .put(TIMESTAMP_WITH_LOCAL_TZ, 
SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE)
+  .put(DATETIME, SqlTypeName.TIMESTAMP)
 
 Review comment:
   Yes. This mapping is not correct.
   
   The discussion here is what mapping we should use for DATETIME. If you check 
SqlTypename, you will find there is no TIMESTAMP without TIME ZONE (or 
equivalent type) exist. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299071)
Time Spent: 2h 50m  (was: 2h 40m)

> Support DATETIME as a logical type in BeamSQL
> -
>
> Key: BEAM-7711
> URL: https://issues.apache.org/jira/browse/BEAM-7711
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Rui Wang
>Priority: Major
>  Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> DATETIME as a type represents a year, month, day, hour, minute, second, and 
> subsecond(millis)
> it ranges from 0001-01-01 00:00:00 to -12-31 23:59:59.999.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (BEAM-8029) Using BigQueryIO.read with DIRECT_READ causes Illegal Mutation

2019-08-21 Thread Chris Larsen (Jira)
Chris Larsen created BEAM-8029:
--

 Summary: Using BigQueryIO.read with DIRECT_READ causes Illegal 
Mutation 
 Key: BEAM-8029
 URL: https://issues.apache.org/jira/browse/BEAM-8029
 Project: Beam
  Issue Type: Bug
  Components: io-java-gcp
Affects Versions: 2.14.0
Reporter: Chris Larsen


 

Code to read from BigQuery that is causing the issue:
{code:java}
pipeline
    .apply(BigQueryIO
    .read(SchemaAndRecord::getRecord)
    .from(options.getTableRef())
    .withMethod(Method.DIRECT_READ)
    .withCoder(AvroCoder.of(schema)))
{code}
If we remove .withMethod(Method.DIRECT_READ) then there is no issue.

 

The error is:
{code:java}
org.apache.beam.sdk.util.IllegalMutationException: PTransform 
BigQueryIO.TypedRead/Read(BigQueryStorageTableSource) mutated value 
{"device_id": "rpi-rpi0-thermostat", "temperature_c": 20.0, "temperature_f": 
52.0, "sample_time": 1564412307969368, "humidity": 74.3} after it was output 
(new value was {"device_id": "rpi-rpi0-thermostat", "temperature_c": 20.0, 
"temperature_f": 52.0, "sample_time": 1564412360458615, "humidity": 74.7}). 
Values must not be mutated in any way after being output.
at 
org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit
 (ImmutabilityCheckingBundleFactory.java:134)
at org.apache.beam.runners.direct.EvaluationContext.commitBundles 
(EvaluationContext.java:210)
at org.apache.beam.runners.direct.EvaluationContext.handleResult 
(EvaluationContext.java:151)
at 
org.apache.beam.runners.direct.QuiescenceDriver$TimerIterableCompletionCallback.handleResult
 (QuiescenceDriver.java:262)
at org.apache.beam.runners.direct.DirectTransformExecutor.finishBundle 
(DirectTransformExecutor.java:189)
at org.apache.beam.runners.direct.DirectTransformExecutor.run 
(DirectTransformExecutor.java:126)
at java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511)
at java.util.concurrent.FutureTask.run (FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker 
(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run 
(ThreadPoolExecutor.java:624)
at java.lang.Thread.run (Thread.java:748)
Caused by: org.apache.beam.sdk.util.IllegalMutationException: Value 
{"device_id": "rpi-rpi0-thermostat", "temperature_c": 20.0, "temperature_f": 
52.0, "sample_time": 1564412307969368, "humidity": 74.3} mutated illegally, new 
value was {"device_id": "rpi-rpi0-thermostat", "temperature_c": 20.0, 
"temperature_f": 52.0, "sample_time": 1564412360458615, "humidity": 74.7}. 
Encoding was 
AiZycGktcnBpMC10aGVybW9zdGF0AgAAADRAAgAAAEpAArDVsP7jtMcFAjMzMzMzk1JA, 
now 
AiZycGktcnBpMC10aGVybW9zdGF0AgAAADRAAgAAAEpAAu6FuLDktMcFAs3MzMzMrFJA.
at 
org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.illegalMutation
 (MutationDetectors.java:153)
at 
org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodifiedThrowingCheckedExceptions
 (MutationDetectors.java:148)
at 
org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodified
 (MutationDetectors.java:123)
at 
org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit
 (ImmutabilityCheckingBundleFactory.java:124)
at org.apache.beam.runners.direct.EvaluationContext.commitBundles 
(EvaluationContext.java:210)
at org.apache.beam.runners.direct.EvaluationContext.handleResult 
(EvaluationContext.java:151)
at 
org.apache.beam.runners.direct.QuiescenceDriver$TimerIterableCompletionCallback.handleResult
 (QuiescenceDriver.java:262)
at org.apache.beam.runners.direct.DirectTransformExecutor.finishBundle 
(DirectTransformExecutor.java:189)
at org.apache.beam.runners.direct.DirectTransformExecutor.run 
(DirectTransformExecutor.java:126)
at java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511)
at java.util.concurrent.FutureTask.run (FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker 
(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run 
(ThreadPoolExecutor.java:624)
at java.lang.Thread.run (Thread.java:748){code}
 



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7856) BigQuery table creation race condition error when executing pipeline on multiple workers

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7856?focusedWorklogId=299060=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299060
 ]

ASF GitHub Bot logged work on BEAM-7856:


Author: ASF GitHub Bot
Created on: 21/Aug/19 22:57
Start Date: 21/Aug/19 22:57
Worklog Time Spent: 10m 
  Work Description: chamikaramj commented on issue #9396: [BEAM-7856] Re 
Raise exception for code other than 409
URL: https://github.com/apache/beam/pull/9396#issuecomment-523680815
 
 
   LGTM. Thanks.
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299060)
Time Spent: 1h 50m  (was: 1h 40m)

> BigQuery table creation race condition error when executing pipeline on 
> multiple workers
> 
>
> Key: BEAM-7856
> URL: https://issues.apache.org/jira/browse/BEAM-7856
> Project: Beam
>  Issue Type: Bug
>  Components: io-py-gcp
>Reporter: Ankur Goenka
>Assignee: Ankur Goenka
>Priority: Major
>  Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> This is non-fatal issue and just prints error in the logs as far as I can 
> tell.
> The issue is when we check and create big query table on multiple workers at 
> the same time. This causes the race condition.
>  
> {noformat}
> File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 157, in _execute response = task() File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 190, in  self._execute(lambda: worker.do_instruction(work), 
> work) File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 342, in do_instruction request.instruction_id) File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 368, in process_bundle bundle_processor.process_bundle(instruction_id)) 
> File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 593, in process_bundle data.ptransform_id].process_encoded(data.data) 
> File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 143, in process_encoded self.output(decoded_value) File 
> "apache_beam/runners/worker/operations.py", line 255, in 
> apache_beam.runners.worker.operations.Operation.output def output(self, 
> windowed_value, output_index=0): File 
> "apache_beam/runners/worker/operations.py", line 256, in 
> apache_beam.runners.worker.operations.Operation.output cython.cast(Receiver, 
> self.receivers[output_index]).receive(windowed_value) File 
> "apache_beam/runners/worker/operations.py", line 143, in 
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive 
> self.consumer.process(windowed_value) File 
> "apache_beam/runners/worker/operations.py", line 593, in 
> apache_beam.runners.worker.operations.DoOperation.process with 
> self.scoped_process_state: File "apache_beam/runners/worker/operations.py", 
> line 594, in apache_beam.runners.worker.operations.DoOperation.process 
> delayed_application = self.dofn_receiver.receive(o) File 
> "apache_beam/runners/common.py", line 799, in 
> apache_beam.runners.common.DoFnRunner.receive self.process(windowed_value) 
> File "apache_beam/runners/common.py", line 805, in 
> apache_beam.runners.common.DoFnRunner.process self._reraise_augmented(exn) 
> File "apache_beam/runners/common.py", line 857, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented raise File 
> "apache_beam/runners/common.py", line 803, in 
> apache_beam.runners.common.DoFnRunner.process return 
> self.do_fn_invoker.invoke_process(windowed_value) File 
> "apache_beam/runners/common.py", line 610, in 
> apache_beam.runners.common.PerWindowInvoker.invoke_process 
> self._invoke_process_per_window( File "apache_beam/runners/common.py", line 
> 682, in 
> apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window 
> output_processor.process_outputs( File "apache_beam/runners/common.py", line 
> 903, in apache_beam.runners.common._OutputProcessor.process_outputs def 
> process_outputs(self, windowed_input_element, results): File 
> "apache_beam/runners/common.py", line 942, in 
> apache_beam.runners.common._OutputProcessor.process_outputs 
> self.main_receivers.receive(windowed_value) File 
> "apache_beam/runners/worker/operations.py", line 143, in 
> 

[jira] [Work logged] (BEAM-8007) Update Python dependencies page for 2.15.0

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8007?focusedWorklogId=299059=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299059
 ]

ASF GitHub Bot logged work on BEAM-8007:


Author: ASF GitHub Bot
Created on: 21/Aug/19 22:56
Start Date: 21/Aug/19 22:56
Worklog Time Spent: 10m 
  Work Description: yifanzou commented on issue #9394: [BEAM-8007] Document 
SDK 2.15.0 Python dependencies
URL: https://github.com/apache/beam/pull/9394#issuecomment-523680647
 
 
   LGTM, thanks.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299059)
Remaining Estimate: 0h
Time Spent: 10m

> Update Python dependencies page for 2.15.0
> --
>
> Key: BEAM-8007
> URL: https://issues.apache.org/jira/browse/BEAM-8007
> Project: Beam
>  Issue Type: Improvement
>  Components: website
>Reporter: Rose Nguyen
>Assignee: Cyrus Maden
>Priority: Minor
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Update Python dependencies page for 2.15.0
> [https://beam.apache.org/documentation/sdks/python-dependencies/]



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7886) Make row coder a standard coder and implement in python

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7886?focusedWorklogId=299057=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299057
 ]

ASF GitHub Bot logged work on BEAM-7886:


Author: ASF GitHub Bot
Created on: 21/Aug/19 22:56
Start Date: 21/Aug/19 22:56
Worklog Time Spent: 10m 
  Work Description: TheNeuralBit commented on pull request #9188: 
[BEAM-7886] Make row coder a standard coder and implement in Python
URL: https://github.com/apache/beam/pull/9188#discussion_r316435823
 
 

 ##
 File path: sdks/python/apache_beam/coders/row_coder.py
 ##
 @@ -0,0 +1,162 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+
+import itertools
+from array import array
+
+from apache_beam.coders.coder_impl import StreamCoderImpl
+from apache_beam.coders.coders import BytesCoder
+from apache_beam.coders.coders import Coder
+from apache_beam.coders.coders import FastCoder
+from apache_beam.coders.coders import FloatCoder
+from apache_beam.coders.coders import IterableCoder
+from apache_beam.coders.coders import StrUtf8Coder
+from apache_beam.coders.coders import TupleCoder
+from apache_beam.coders.coders import VarIntCoder
+from apache_beam.portability import common_urns
+from apache_beam.portability.api import schema_pb2
+from apache_beam.typehints.schemas import named_tuple_from_schema
+from apache_beam.typehints.schemas import named_tuple_to_schema
+
+__all__ = ["RowCoder"]
+
+
+class RowCoder(FastCoder):
+  """ Coder for `typing.NamedTuple` instances.
+
+  Implements the beam:coder:row:v1 standard coder spec.
+  """
+
+  def __init__(self, schema):
+self.schema = schema
+self.components = [
+coder_from_type(field.type) for field in self.schema.fields
+]
+
+  def _create_impl(self):
+return RowCoderImpl(self.schema, self.components)
+
+  def is_deterministic(self):
+return all(c.is_deterministic() for c in self.components)
+
+  def to_type_hint(self):
+return named_tuple_from_schema(self.schema)
+
+  def as_cloud_object(self, coders_context=None):
+raise NotImplementedError("TODO")
+
+  def __eq__(self, other):
+return type(self) == type(other) and self.schema == other.schema
+
+  def __hash__(self):
 
 Review comment:
   Yeah the reason is that self.schema is isn't hashable. I tried to follow the 
model of the structured coders that hash their element coders, but I had to 
serialize the schema to make it work.
   
   It will be hashable whether this is defined or not though since `Coder` 
includes:
   
   ```
   def __hash__(self):
 return hash(type(self))
   ```
   
   I suppose it's fine to just let the collisions happen and rely on `__eq__` 
though? 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299057)
Time Spent: 6h 50m  (was: 6h 40m)

> Make row coder a standard coder and implement in python
> ---
>
> Key: BEAM-7886
> URL: https://issues.apache.org/jira/browse/BEAM-7886
> Project: Beam
>  Issue Type: Improvement
>  Components: beam-model, sdk-java-core, sdk-py-core
>Reporter: Brian Hulette
>Assignee: Brian Hulette
>Priority: Major
>  Time Spent: 6h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7886) Make row coder a standard coder and implement in python

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7886?focusedWorklogId=299056=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299056
 ]

ASF GitHub Bot logged work on BEAM-7886:


Author: ASF GitHub Bot
Created on: 21/Aug/19 22:56
Start Date: 21/Aug/19 22:56
Worklog Time Spent: 10m 
  Work Description: TheNeuralBit commented on pull request #9188: 
[BEAM-7886] Make row coder a standard coder and implement in Python
URL: https://github.com/apache/beam/pull/9188#discussion_r316435823
 
 

 ##
 File path: sdks/python/apache_beam/coders/row_coder.py
 ##
 @@ -0,0 +1,162 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+
+import itertools
+from array import array
+
+from apache_beam.coders.coder_impl import StreamCoderImpl
+from apache_beam.coders.coders import BytesCoder
+from apache_beam.coders.coders import Coder
+from apache_beam.coders.coders import FastCoder
+from apache_beam.coders.coders import FloatCoder
+from apache_beam.coders.coders import IterableCoder
+from apache_beam.coders.coders import StrUtf8Coder
+from apache_beam.coders.coders import TupleCoder
+from apache_beam.coders.coders import VarIntCoder
+from apache_beam.portability import common_urns
+from apache_beam.portability.api import schema_pb2
+from apache_beam.typehints.schemas import named_tuple_from_schema
+from apache_beam.typehints.schemas import named_tuple_to_schema
+
+__all__ = ["RowCoder"]
+
+
+class RowCoder(FastCoder):
+  """ Coder for `typing.NamedTuple` instances.
+
+  Implements the beam:coder:row:v1 standard coder spec.
+  """
+
+  def __init__(self, schema):
+self.schema = schema
+self.components = [
+coder_from_type(field.type) for field in self.schema.fields
+]
+
+  def _create_impl(self):
+return RowCoderImpl(self.schema, self.components)
+
+  def is_deterministic(self):
+return all(c.is_deterministic() for c in self.components)
+
+  def to_type_hint(self):
+return named_tuple_from_schema(self.schema)
+
+  def as_cloud_object(self, coders_context=None):
+raise NotImplementedError("TODO")
+
+  def __eq__(self, other):
+return type(self) == type(other) and self.schema == other.schema
+
+  def __hash__(self):
 
 Review comment:
   Yeah the reason is that self.schema is isn't hashable. I tried to follow the 
model of the structured coders that hash their element coders, but I had to 
serialize the schema to make it work.
   
   It will be hashable whether this is defined or not though since `Coder` 
includes:
   
   ```
   def __hash__(self):
 return hash(type(self))
   ```
   
   I suppose it's fine to just let the collisions happen and rely on __eq__ 
though? 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299056)
Time Spent: 6h 40m  (was: 6.5h)

> Make row coder a standard coder and implement in python
> ---
>
> Key: BEAM-7886
> URL: https://issues.apache.org/jira/browse/BEAM-7886
> Project: Beam
>  Issue Type: Improvement
>  Components: beam-model, sdk-java-core, sdk-py-core
>Reporter: Brian Hulette
>Assignee: Brian Hulette
>Priority: Major
>  Time Spent: 6h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7886) Make row coder a standard coder and implement in python

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7886?focusedWorklogId=299058=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299058
 ]

ASF GitHub Bot logged work on BEAM-7886:


Author: ASF GitHub Bot
Created on: 21/Aug/19 22:56
Start Date: 21/Aug/19 22:56
Worklog Time Spent: 10m 
  Work Description: TheNeuralBit commented on pull request #9188: 
[BEAM-7886] Make row coder a standard coder and implement in Python
URL: https://github.com/apache/beam/pull/9188#discussion_r316435823
 
 

 ##
 File path: sdks/python/apache_beam/coders/row_coder.py
 ##
 @@ -0,0 +1,162 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+
+import itertools
+from array import array
+
+from apache_beam.coders.coder_impl import StreamCoderImpl
+from apache_beam.coders.coders import BytesCoder
+from apache_beam.coders.coders import Coder
+from apache_beam.coders.coders import FastCoder
+from apache_beam.coders.coders import FloatCoder
+from apache_beam.coders.coders import IterableCoder
+from apache_beam.coders.coders import StrUtf8Coder
+from apache_beam.coders.coders import TupleCoder
+from apache_beam.coders.coders import VarIntCoder
+from apache_beam.portability import common_urns
+from apache_beam.portability.api import schema_pb2
+from apache_beam.typehints.schemas import named_tuple_from_schema
+from apache_beam.typehints.schemas import named_tuple_to_schema
+
+__all__ = ["RowCoder"]
+
+
+class RowCoder(FastCoder):
+  """ Coder for `typing.NamedTuple` instances.
+
+  Implements the beam:coder:row:v1 standard coder spec.
+  """
+
+  def __init__(self, schema):
+self.schema = schema
+self.components = [
+coder_from_type(field.type) for field in self.schema.fields
+]
+
+  def _create_impl(self):
+return RowCoderImpl(self.schema, self.components)
+
+  def is_deterministic(self):
+return all(c.is_deterministic() for c in self.components)
+
+  def to_type_hint(self):
+return named_tuple_from_schema(self.schema)
+
+  def as_cloud_object(self, coders_context=None):
+raise NotImplementedError("TODO")
+
+  def __eq__(self, other):
+return type(self) == type(other) and self.schema == other.schema
+
+  def __hash__(self):
 
 Review comment:
   Yeah the reason is that self.schema is isn't hashable. I tried to follow the 
model of the structured coders that hash their element coders, but I had to 
serialize the schema to make it work.
   
   It will be hashable whether this is defined or not though since `Coder` 
includes:
   
   ```python
   def __hash__(self):
 return hash(type(self))
   ```
   
   I suppose it's fine to just let the collisions happen and rely on `__eq__` 
though? 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299058)
Time Spent: 7h  (was: 6h 50m)

> Make row coder a standard coder and implement in python
> ---
>
> Key: BEAM-7886
> URL: https://issues.apache.org/jira/browse/BEAM-7886
> Project: Beam
>  Issue Type: Improvement
>  Components: beam-model, sdk-java-core, sdk-py-core
>Reporter: Brian Hulette
>Assignee: Brian Hulette
>Priority: Major
>  Time Spent: 7h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7856) BigQuery table creation race condition error when executing pipeline on multiple workers

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7856?focusedWorklogId=299054=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299054
 ]

ASF GitHub Bot logged work on BEAM-7856:


Author: ASF GitHub Bot
Created on: 21/Aug/19 22:55
Start Date: 21/Aug/19 22:55
Worklog Time Spent: 10m 
  Work Description: angoenka commented on issue #9396: [BEAM-7856] Re Raise 
exception for code other than 409
URL: https://github.com/apache/beam/pull/9396#issuecomment-523680327
 
 
   R: @chamikaramj 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299054)
Time Spent: 1h 40m  (was: 1.5h)

> BigQuery table creation race condition error when executing pipeline on 
> multiple workers
> 
>
> Key: BEAM-7856
> URL: https://issues.apache.org/jira/browse/BEAM-7856
> Project: Beam
>  Issue Type: Bug
>  Components: io-py-gcp
>Reporter: Ankur Goenka
>Assignee: Ankur Goenka
>Priority: Major
>  Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> This is non-fatal issue and just prints error in the logs as far as I can 
> tell.
> The issue is when we check and create big query table on multiple workers at 
> the same time. This causes the race condition.
>  
> {noformat}
> File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 157, in _execute response = task() File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 190, in  self._execute(lambda: worker.do_instruction(work), 
> work) File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 342, in do_instruction request.instruction_id) File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 368, in process_bundle bundle_processor.process_bundle(instruction_id)) 
> File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 593, in process_bundle data.ptransform_id].process_encoded(data.data) 
> File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 143, in process_encoded self.output(decoded_value) File 
> "apache_beam/runners/worker/operations.py", line 255, in 
> apache_beam.runners.worker.operations.Operation.output def output(self, 
> windowed_value, output_index=0): File 
> "apache_beam/runners/worker/operations.py", line 256, in 
> apache_beam.runners.worker.operations.Operation.output cython.cast(Receiver, 
> self.receivers[output_index]).receive(windowed_value) File 
> "apache_beam/runners/worker/operations.py", line 143, in 
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive 
> self.consumer.process(windowed_value) File 
> "apache_beam/runners/worker/operations.py", line 593, in 
> apache_beam.runners.worker.operations.DoOperation.process with 
> self.scoped_process_state: File "apache_beam/runners/worker/operations.py", 
> line 594, in apache_beam.runners.worker.operations.DoOperation.process 
> delayed_application = self.dofn_receiver.receive(o) File 
> "apache_beam/runners/common.py", line 799, in 
> apache_beam.runners.common.DoFnRunner.receive self.process(windowed_value) 
> File "apache_beam/runners/common.py", line 805, in 
> apache_beam.runners.common.DoFnRunner.process self._reraise_augmented(exn) 
> File "apache_beam/runners/common.py", line 857, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented raise File 
> "apache_beam/runners/common.py", line 803, in 
> apache_beam.runners.common.DoFnRunner.process return 
> self.do_fn_invoker.invoke_process(windowed_value) File 
> "apache_beam/runners/common.py", line 610, in 
> apache_beam.runners.common.PerWindowInvoker.invoke_process 
> self._invoke_process_per_window( File "apache_beam/runners/common.py", line 
> 682, in 
> apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window 
> output_processor.process_outputs( File "apache_beam/runners/common.py", line 
> 903, in apache_beam.runners.common._OutputProcessor.process_outputs def 
> process_outputs(self, windowed_input_element, results): File 
> "apache_beam/runners/common.py", line 942, in 
> apache_beam.runners.common._OutputProcessor.process_outputs 
> self.main_receivers.receive(windowed_value) File 
> "apache_beam/runners/worker/operations.py", line 143, in 
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive 

[jira] [Work logged] (BEAM-7856) BigQuery table creation race condition error when executing pipeline on multiple workers

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7856?focusedWorklogId=299053=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299053
 ]

ASF GitHub Bot logged work on BEAM-7856:


Author: ASF GitHub Bot
Created on: 21/Aug/19 22:54
Start Date: 21/Aug/19 22:54
Worklog Time Spent: 10m 
  Work Description: angoenka commented on pull request #9396: [BEAM-7856] 
Re Raise exception for code other than 409
URL: https://github.com/apache/beam/pull/9396
 
 
   **Please** add a meaningful description for your change here
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build
 

[jira] [Work logged] (BEAM-8018) Detect unexported fields in unregistered types for better error messages

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8018?focusedWorklogId=299052=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299052
 ]

ASF GitHub Bot logged work on BEAM-8018:


Author: ASF GitHub Bot
Created on: 21/Aug/19 22:53
Start Date: 21/Aug/19 22:53
Worklog Time Spent: 10m 
  Work Description: youngoli commented on pull request #9387: [BEAM-8018] 
Detect unexported fields earlier
URL: https://github.com/apache/beam/pull/9387#discussion_r316434083
 
 

 ##
 File path: sdks/go/pkg/beam/core/runtime/graphx/serialize_test.go
 ##
 @@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package graphx
+
+import (
+   "reflect"
+   "strings"
+   "testing"
+
+   "github.com/apache/beam/sdks/go/pkg/beam/core/runtime"
+   v1 "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx/v1"
+)
+
+func TestEncodeType(t *testing.T) {
+   t.Run("NoUnexportedFields", func(t *testing.T) {
+   type MyAwesomeType struct {
+   ExportedField string
+   }
+   rt := reflect.TypeOf((*MyAwesomeType)(nil)).Elem()
+
+   pbT, err := encodeType(rt)
+   if err != nil {
+   t.Fatalf("got error = %v, want nil", err)
+   }
+   if got, want := pbT.Kind, v1.Type_STRUCT; got != want {
+   t.Fatalf("got pbT.Kin == %v, want %v", got, want)
+   }
+   })
+   t.Run("UnregisteredWithUnexportedField", func(t *testing.T) {
+   type MyProblematicType struct {
+   unexportedField string
+   }
+   rt := reflect.TypeOf((*MyProblematicType)(nil)).Elem()
+   pbT, err := encodeType(rt)
+   if err == nil {
+   t.Fatalf("got type = %v, nil, want unexported field 
error", pbT)
+   }
+   if !strings.Contains(err.Error(), "type has unexported field: 
unexportedField") {
+   t.Errorf("expected error about unexported field, got 
%q", err.Error())
+   }
+   })
+   t.Run("RegisteredWithUnexportedField", func(t *testing.T) {
+   type MyRegisteredType struct {
+   unexportedField string
+   }
+   rt := reflect.TypeOf((*MyRegisteredType)(nil)).Elem()
+   runtime.RegisterType(rt)
+   pbT, err := encodeType(rt)
+   if err != nil {
+   t.Fatalf("got error = %v, want nil", err)
+   }
+   if got, want := pbT.Kind, v1.Type_EXTERNAL; got != want {
+   t.Fatalf("got pbT.Kin == %v, want %v", got, want)
 
 Review comment:
   Same typo as above.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299052)
Time Spent: 40m  (was: 0.5h)

> Detect unexported fields in unregistered types for better error messages
> 
>
> Key: BEAM-8018
> URL: https://issues.apache.org/jira/browse/BEAM-8018
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Robert Burke
>Assignee: Robert Burke
>Priority: Minor
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> Unregistered types with unexported fields cause pipeline execution time 
> panics rather than construction time panics/errors, which would be preferable.
> Improved error messages encouraging users to register their types already 
> exists, but isn't currently triggered in this instance, so it's a matter of 
> returning an error in this case.
> eg. for using time.Time as a value in a DoFn...
> panic: 
> 
> 

[jira] [Work logged] (BEAM-8018) Detect unexported fields in unregistered types for better error messages

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8018?focusedWorklogId=299051=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299051
 ]

ASF GitHub Bot logged work on BEAM-8018:


Author: ASF GitHub Bot
Created on: 21/Aug/19 22:53
Start Date: 21/Aug/19 22:53
Worklog Time Spent: 10m 
  Work Description: youngoli commented on pull request #9387: [BEAM-8018] 
Detect unexported fields earlier
URL: https://github.com/apache/beam/pull/9387#discussion_r316431985
 
 

 ##
 File path: sdks/go/pkg/beam/core/runtime/graphx/serialize_test.go
 ##
 @@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package graphx
+
+import (
+   "reflect"
+   "strings"
+   "testing"
+
+   "github.com/apache/beam/sdks/go/pkg/beam/core/runtime"
+   v1 "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx/v1"
+)
+
+func TestEncodeType(t *testing.T) {
+   t.Run("NoUnexportedFields", func(t *testing.T) {
+   type MyAwesomeType struct {
+   ExportedField string
+   }
+   rt := reflect.TypeOf((*MyAwesomeType)(nil)).Elem()
+
+   pbT, err := encodeType(rt)
+   if err != nil {
+   t.Fatalf("got error = %v, want nil", err)
+   }
+   if got, want := pbT.Kind, v1.Type_STRUCT; got != want {
+   t.Fatalf("got pbT.Kin == %v, want %v", got, want)
 
 Review comment:
   Nit: Typo.
   ```suggestion
t.Fatalf("got pbT.Kind == %v, want %v", got, want)
   ```
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299051)
Time Spent: 0.5h  (was: 20m)

> Detect unexported fields in unregistered types for better error messages
> 
>
> Key: BEAM-8018
> URL: https://issues.apache.org/jira/browse/BEAM-8018
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Robert Burke
>Assignee: Robert Burke
>Priority: Minor
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> Unregistered types with unexported fields cause pipeline execution time 
> panics rather than construction time panics/errors, which would be preferable.
> Improved error messages encouraging users to register their types already 
> exists, but isn't currently triggered in this instance, so it's a matter of 
> returning an error in this case.
> eg. for using time.Time as a value in a DoFn...
> panic: 
> 
> reflect.StructOf: StructOf does not allow unexported fields [recovered]
>         panic: reflect.StructOf: StructOf does not allow unexported fields
> goroutine 195 [running]:
> panic(0x7e0a060, 0x84beb70)
>         third_party/go/gc/src/runtime/panic.go:567 +0x2da fp=0xc000dc1178 
> sp=0xc000dc10c0 pc=0xee24daa
> testing.tRunner.func1(0xc000d6ec00)
>         third_party/go/gc/src/testing/testing.go:830 +0x388 fp=0xc000dc11f8 
> sp=0xc000dc1178 pc=0xfafda58
> runtime.call32(0x0, 0x833e1e0, 0xc000caaab0, 0x80008)
>         third_party/go/gc/src/runtime/asm_amd64.s:519 +0x3b fp=0xc000dc1228 
> sp=0xc000dc11f8 pc=0xee53acb
> panic(0x7e0a060, 0x84beb70)
>         third_party/go/gc/src/runtime/panic.go:522 +0x1b5 fp=0xc000dc12e0 
> sp=0xc000dc1228 pc=0xee24c85
> reflect.runtimeStructField(0xc000e267e0, 0x4, 0xc000e267e4, 0x4, 0x8541880, 
> 0x7e0a060, 0x0, 0x0, 0x0, 0xc000e26990, ...)
>         third_party/go/gc/src/reflect/type.go:2765 +0x1c2 fp=0xc000dc1348 
> sp=0xc000dc12e0 pc=0xee80ea2
> reflect.StructOf(0xc0006dd040, 0x3, 0x4, 0x0, 0x0)
>         third_party/go/gc/src/reflect/type.go:2371 +0x21f6 fp=0xc000dc1b28 
> sp=0xc000dc1348 pc=0xee7f816
> 

[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=299048=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299048
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 21/Aug/19 22:31
Start Date: 21/Aug/19 22:31
Worklog Time Spent: 10m 
  Work Description: tweise commented on pull request #9371: [BEAM-7980] 
External environment with containerized worker pool
URL: https://github.com/apache/beam/pull/9371
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299048)
Time Spent: 5h 10m  (was: 5h)

> External environment with containerized worker pool
> ---
>
> Key: BEAM-7980
> URL: https://issues.apache.org/jira/browse/BEAM-7980
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Time Spent: 5h 10m
>  Remaining Estimate: 0h
>
> Augment Beam Python docker image and boot.go so that it can be used to launch 
> BeamFnExternalWorkerPoolServicer.
> [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0]
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7600) Spark portable runner: reuse SDK harness

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7600?focusedWorklogId=299045=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299045
 ]

ASF GitHub Bot logged work on BEAM-7600:


Author: ASF GitHub Bot
Created on: 21/Aug/19 22:21
Start Date: 21/Aug/19 22:21
Worklog Time Spent: 10m 
  Work Description: ibzib commented on issue #9095: [BEAM-7600] borrow SDK 
harness management code into Spark runner
URL: https://github.com/apache/beam/pull/9095#issuecomment-523672131
 
 
   Run Java Spark PortableValidatesRunner Batch
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299045)
Time Spent: 5h 20m  (was: 5h 10m)

> Spark portable runner: reuse SDK harness
> 
>
> Key: BEAM-7600
> URL: https://issues.apache.org/jira/browse/BEAM-7600
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Reporter: Kyle Weaver
>Assignee: Kyle Weaver
>Priority: Major
>  Labels: portability-spark
>  Time Spent: 5h 20m
>  Remaining Estimate: 0h
>
> Right now, we're creating a new SDK harness every time an executable stage is 
> run [1], which is expensive. We should be able to re-use code from the Flink 
> runner to re-use the SDK harness [2].
>  
> [1] 
> [https://github.com/apache/beam/blob/c9fb261bc7666788402840bb6ce1b0ce2fd445d1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java#L135]
> [2] 
> [https://github.com/apache/beam/blob/c9fb261bc7666788402840bb6ce1b0ce2fd445d1/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDefaultExecutableStageContext.java]



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Resolved] (BEAM-7531) Don't auto-create GCS buckets with KMS settings

2019-08-21 Thread Udi Meiri (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7531?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Udi Meiri resolved BEAM-7531.
-
Fix Version/s: Not applicable
   Resolution: Fixed

> Don't auto-create GCS buckets with KMS settings
> ---
>
> Key: BEAM-7531
> URL: https://issues.apache.org/jira/browse/BEAM-7531
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-gcp, io-py-gcp
>Reporter: Udi Meiri
>Assignee: Udi Meiri
>Priority: Major
> Fix For: Not applicable
>
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> For Dataflow jobs, if a staging/temp path is not provided, a default bucket 
> name is generated using the GCP project and job region. 
> https://github.com/apache/beam/blob/39dab79b7bd56a47c0f2a02033ab4ca3bd4a67e2/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java#L319
> We recently added the KMS setting to this bucket creation, but since 
> different jobs might use different KMS keys this is unexpected behavior.
> This bug is to remove this KMS setting from default bucket creation, and 
> possibly error out when a default bucket is used and KMS settings are 
> provided.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Resolved] (BEAM-7986) Increase minimum grpcio required version

2019-08-21 Thread Udi Meiri (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7986?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Udi Meiri resolved BEAM-7986.
-
Fix Version/s: 2.16.0
   Resolution: Fixed

> Increase minimum grpcio required version
> 
>
> Key: BEAM-7986
> URL: https://issues.apache.org/jira/browse/BEAM-7986
> Project: Beam
>  Issue Type: Bug
>  Components: build-system
>Reporter: Udi Meiri
>Assignee: Udi Meiri
>Priority: Major
> Fix For: 2.16.0
>
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> According to this question, 1.11.0 is not new enough (1.22.0 reportedly 
> works), and we list the minimum as 1.8.
> https://stackoverflow.com/questions/57479498/beam-channel-object-has-no-attribute-close?noredirect=1#comment101446049_57479498
> Affects DirectRunner Pub/Sub client.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (BEAM-7531) Don't auto-create GCS buckets with KMS settings

2019-08-21 Thread Udi Meiri (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-7531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16912732#comment-16912732
 ] 

Udi Meiri commented on BEAM-7531:
-

PR was closed for being stale. Closing this.

> Don't auto-create GCS buckets with KMS settings
> ---
>
> Key: BEAM-7531
> URL: https://issues.apache.org/jira/browse/BEAM-7531
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-gcp, io-py-gcp
>Reporter: Udi Meiri
>Assignee: Udi Meiri
>Priority: Major
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> For Dataflow jobs, if a staging/temp path is not provided, a default bucket 
> name is generated using the GCP project and job region. 
> https://github.com/apache/beam/blob/39dab79b7bd56a47c0f2a02033ab4ca3bd4a67e2/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java#L319
> We recently added the KMS setting to this bucket creation, but since 
> different jobs might use different KMS keys this is unexpected behavior.
> This bug is to remove this KMS setting from default bucket creation, and 
> possibly error out when a default bucket is used and KMS settings are 
> provided.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7886) Make row coder a standard coder and implement in python

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7886?focusedWorklogId=299042=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299042
 ]

ASF GitHub Bot logged work on BEAM-7886:


Author: ASF GitHub Bot
Created on: 21/Aug/19 22:11
Start Date: 21/Aug/19 22:11
Worklog Time Spent: 10m 
  Work Description: TheNeuralBit commented on pull request #9188: 
[BEAM-7886] Make row coder a standard coder and implement in Python
URL: https://github.com/apache/beam/pull/9188#discussion_r316424584
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/PortableSchemaCoder.java
 ##
 @@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas;
+
+import org.apache.beam.sdk.transforms.SerializableFunctions;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * A version of SchemaCoder that can only produce/consume Row instances.
+ *
+ * Implements the beam:coders:row:v1 standard coder while still satisfying 
the requirement that a
+ * PCollection is only considered to have a schema if its coder is an instance 
of SchemaCoder.
+ */
+public class PortableSchemaCoder extends SchemaCoder {
 
 Review comment:
   Yeah, I need a unique class for the map in CoderTranslation. I originally 
tried to use `RowCoder` for this directly, but that didn't work since it needs 
to be a sub-class of SchemaCoder for PCollection.hasSchema. 
   
   This was the easiest solution I came up with to make it work, but I'm 
definitely open to other ideas. Maybe I could re-work the CoderTranslation 
logic so that `SchemaCoder` instances translate to `beam:coder:row:v1`?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299042)
Time Spent: 6.5h  (was: 6h 20m)

> Make row coder a standard coder and implement in python
> ---
>
> Key: BEAM-7886
> URL: https://issues.apache.org/jira/browse/BEAM-7886
> Project: Beam
>  Issue Type: Improvement
>  Components: beam-model, sdk-java-core, sdk-py-core
>Reporter: Brian Hulette
>Assignee: Brian Hulette
>Priority: Major
>  Time Spent: 6.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7969) Streaming Dataflow worker doesn't report FnAPI metrics.

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7969?focusedWorklogId=299040=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299040
 ]

ASF GitHub Bot logged work on BEAM-7969:


Author: ASF GitHub Bot
Created on: 21/Aug/19 22:04
Start Date: 21/Aug/19 22:04
Worklog Time Spent: 10m 
  Work Description: Ardagan commented on issue #9330: [BEAM-7969] Report 
FnAPI counters as deltas in streaming jobs.
URL: https://github.com/apache/beam/pull/9330#issuecomment-523667317
 
 
   Run Java PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299040)
Time Spent: 4h 40m  (was: 4.5h)

> Streaming Dataflow worker doesn't report FnAPI metrics.
> ---
>
> Key: BEAM-7969
> URL: https://issues.apache.org/jira/browse/BEAM-7969
> Project: Beam
>  Issue Type: Bug
>  Components: java-fn-execution, runner-dataflow
>Reporter: Mikhail Gryzykhin
>Assignee: Mikhail Gryzykhin
>Priority: Major
>  Time Spent: 4h 40m
>  Remaining Estimate: 0h
>
> EOM



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7969) Streaming Dataflow worker doesn't report FnAPI metrics.

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7969?focusedWorklogId=299039=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299039
 ]

ASF GitHub Bot logged work on BEAM-7969:


Author: ASF GitHub Bot
Created on: 21/Aug/19 22:04
Start Date: 21/Aug/19 22:04
Worklog Time Spent: 10m 
  Work Description: Ardagan commented on issue #9330: [BEAM-7969] Report 
FnAPI counters as deltas in streaming jobs.
URL: https://github.com/apache/beam/pull/9330#issuecomment-523667317
 
 
   Run Java PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299039)
Time Spent: 4.5h  (was: 4h 20m)

> Streaming Dataflow worker doesn't report FnAPI metrics.
> ---
>
> Key: BEAM-7969
> URL: https://issues.apache.org/jira/browse/BEAM-7969
> Project: Beam
>  Issue Type: Bug
>  Components: java-fn-execution, runner-dataflow
>Reporter: Mikhail Gryzykhin
>Assignee: Mikhail Gryzykhin
>Priority: Major
>  Time Spent: 4.5h
>  Remaining Estimate: 0h
>
> EOM



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Updated] (BEAM-8028) Simplify running of Beam Python on Spark

2019-08-21 Thread Kyle Weaver (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8028?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kyle Weaver updated BEAM-8028:
--
Description: Currently this requires building and running several 
processes. We should be able to automate most of this away, such as in 
[https://github.com/apache/beam/pull/9043]  (was: Currently this requires 
building and running several processes. We should be able to automate most of 
this away. )

> Simplify running of Beam Python on Spark
> 
>
> Key: BEAM-8028
> URL: https://issues.apache.org/jira/browse/BEAM-8028
> Project: Beam
>  Issue Type: Test
>  Components: sdk-py-core
>Reporter: Kyle Weaver
>Assignee: Kyle Weaver
>Priority: Major
>
> Currently this requires building and running several processes. We should be 
> able to automate most of this away, such as in 
> [https://github.com/apache/beam/pull/9043]



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (BEAM-7870) Externally configured KafkaIO consumer causes coder problems

2019-08-21 Thread Chad Dombrova (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-7870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16912720#comment-16912720
 ] 

Chad Dombrova commented on BEAM-7870:
-

Another possible solution:  replace PubsubMessage & KafkaRecord with 
sdk.values.Row, so that they are compatible with the new portable RowCoder 
feature being added in BEAM-7886.  The main downside I see is that the API 
would be considerably uglier on the Java side. 



> Externally configured KafkaIO consumer causes coder problems
> 
>
> Key: BEAM-7870
> URL: https://issues.apache.org/jira/browse/BEAM-7870
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink, sdk-java-core
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>
> There are limitations for the consumer to work correctly. The biggest issue 
> is the structure of KafkaIO itself, which uses a combination of the source 
> interface and DoFns to generate the desired output. The problem is that the 
> source interface is natively translated by the Flink Runner to support 
> unbounded sources in portability, while the DoFn runs in a Java environment.
> To transfer data between the two a coder needs to be involved. It happens to 
> be that the initial read does not immediately drop the KafakRecord structure 
> which does not work together well with our current assumption of only 
> supporting "standard coders" present in all SDKs. Only the subsequent DoFn 
> converts the KafkaRecord structure into a raw KV[byte, byte], but the DoFn 
> won't have the coder available in its environment.
> There are several possible solutions:
>  1. Make the DoFn which drops the KafkaRecordCoder a native Java transform in 
> the Flink Runner
>  2. Modify KafkaIO to immediately drop the KafkaRecord structure
>  3. Add the KafkaRecordCoder to all SDKs
>  4. Add a generic coder, e.g. AvroCoder to all SDKs
> For a workaround which uses (3), please see this patch which is not a proper 
> fix but adds KafkaRecordCoder to the SDK such that it can be used 
> encode/decode records: 
> [https://github.com/mxm/beam/commit/b31cf99c75b3972018180d8ccc7e73d311f4cfed]
>  
> See also 
> [https://github.com/apache/beam/pull/8251|https://github.com/apache/beam/pull/8251:]



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (BEAM-8028) Simplify running of Beam Python on Spark

2019-08-21 Thread Kyle Weaver (Jira)
Kyle Weaver created BEAM-8028:
-

 Summary: Simplify running of Beam Python on Spark
 Key: BEAM-8028
 URL: https://issues.apache.org/jira/browse/BEAM-8028
 Project: Beam
  Issue Type: Test
  Components: sdk-py-core
Reporter: Kyle Weaver
Assignee: Robert Bradshaw


Currently this requires building and running several processes. We should be 
able to automate most of this away. 



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Assigned] (BEAM-8028) Simplify running of Beam Python on Spark

2019-08-21 Thread Kyle Weaver (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8028?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kyle Weaver reassigned BEAM-8028:
-

Assignee: Kyle Weaver  (was: Robert Bradshaw)

> Simplify running of Beam Python on Spark
> 
>
> Key: BEAM-8028
> URL: https://issues.apache.org/jira/browse/BEAM-8028
> Project: Beam
>  Issue Type: Test
>  Components: sdk-py-core
>Reporter: Kyle Weaver
>Assignee: Kyle Weaver
>Priority: Major
>
> Currently this requires building and running several processes. We should be 
> able to automate most of this away. 



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=299036=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299036
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 21/Aug/19 21:46
Start Date: 21/Aug/19 21:46
Worklog Time Spent: 10m 
  Work Description: tweise commented on issue #9371: [BEAM-7980] External 
environment with containerized worker pool
URL: https://github.com/apache/beam/pull/9371#issuecomment-523662128
 
 
   Run Java PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299036)
Time Spent: 5h  (was: 4h 50m)

> External environment with containerized worker pool
> ---
>
> Key: BEAM-7980
> URL: https://issues.apache.org/jira/browse/BEAM-7980
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Time Spent: 5h
>  Remaining Estimate: 0h
>
> Augment Beam Python docker image and boot.go so that it can be used to launch 
> BeamFnExternalWorkerPoolServicer.
> [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0]
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=299034=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299034
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 21/Aug/19 21:40
Start Date: 21/Aug/19 21:40
Worklog Time Spent: 10m 
  Work Description: tweise commented on issue #9371: [BEAM-7980] External 
environment with containerized worker pool
URL: https://github.com/apache/beam/pull/9371#issuecomment-523660441
 
 
   Run Portable_Python PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299034)
Time Spent: 4h 50m  (was: 4h 40m)

> External environment with containerized worker pool
> ---
>
> Key: BEAM-7980
> URL: https://issues.apache.org/jira/browse/BEAM-7980
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Time Spent: 4h 50m
>  Remaining Estimate: 0h
>
> Augment Beam Python docker image and boot.go so that it can be used to launch 
> BeamFnExternalWorkerPoolServicer.
> [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0]
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=299032=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299032
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 21/Aug/19 21:32
Start Date: 21/Aug/19 21:32
Worklog Time Spent: 10m 
  Work Description: tweise commented on issue #9371: [BEAM-7980] External 
environment with containerized worker pool
URL: https://github.com/apache/beam/pull/9371#issuecomment-523657737
 
 
   Run Portable_Python PreCommit
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299032)
Time Spent: 4h 40m  (was: 4.5h)

> External environment with containerized worker pool
> ---
>
> Key: BEAM-7980
> URL: https://issues.apache.org/jira/browse/BEAM-7980
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Time Spent: 4h 40m
>  Remaining Estimate: 0h
>
> Augment Beam Python docker image and boot.go so that it can be used to launch 
> BeamFnExternalWorkerPoolServicer.
> [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0]
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Updated] (BEAM-7632) Update Python quickstart guide for Flink and Spark

2019-08-21 Thread Kyle Weaver (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kyle Weaver updated BEAM-7632:
--
Description: 
Currently, the documentation says "This runner is not yet available for the 
Python SDK.", which is out of date. 
[https://beam.apache.org/get-started/quickstart-py/]

Edit: when Beam 2.15 is released, we should update the quickstart to use the 
new Python FlinkRunner [1].

 

[1] 
[https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/portability/flink_runner.py]

  was:Currently, the documentation says "This runner is not yet available for 
the Python SDK.", which is out of date. 
[https://beam.apache.org/get-started/quickstart-py/]


> Update Python quickstart guide for Flink and Spark
> --
>
> Key: BEAM-7632
> URL: https://issues.apache.org/jira/browse/BEAM-7632
> Project: Beam
>  Issue Type: Improvement
>  Components: website
>Reporter: Kyle Weaver
>Assignee: Kyle Weaver
>Priority: Minor
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> Currently, the documentation says "This runner is not yet available for the 
> Python SDK.", which is out of date. 
> [https://beam.apache.org/get-started/quickstart-py/]
> Edit: when Beam 2.15 is released, we should update the quickstart to use the 
> new Python FlinkRunner [1].
>  
> [1] 
> [https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/portability/flink_runner.py]



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Closed] (BEAM-8027) Add Spark Python wordcount test

2019-08-21 Thread Kyle Weaver (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kyle Weaver closed BEAM-8027.
-
Fix Version/s: Not applicable
   Resolution: Duplicate

> Add Spark Python wordcount test
> ---
>
> Key: BEAM-8027
> URL: https://issues.apache.org/jira/browse/BEAM-8027
> Project: Beam
>  Issue Type: Test
>  Components: runner-spark
>Reporter: Kyle Weaver
>Assignee: Kyle Weaver
>Priority: Major
> Fix For: Not applicable
>
>
> We are not catching bugs such as BEAM-7864 because we are not running Python 
> wordcount test on Spark.
> We added these tests for Flink in [https://github.com/apache/beam/pull/8745]



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-8015) Get logs for Docker containers that fail to start up

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8015?focusedWorklogId=299019=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299019
 ]

ASF GitHub Bot logged work on BEAM-8015:


Author: ASF GitHub Bot
Created on: 21/Aug/19 21:01
Start Date: 21/Aug/19 21:01
Worklog Time Spent: 10m 
  Work Description: ibzib commented on issue #9389: [BEAM-8015] Get logs 
from Docker containers
URL: https://github.com/apache/beam/pull/9389#issuecomment-523647699
 
 
   Run Java PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299019)
Time Spent: 50m  (was: 40m)

> Get logs for Docker containers that fail to start up
> 
>
> Key: BEAM-8015
> URL: https://issues.apache.org/jira/browse/BEAM-8015
> Project: Beam
>  Issue Type: Improvement
>  Components: java-fn-execution
>Reporter: Kyle Weaver
>Assignee: Kyle Weaver
>Priority: Major
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> Currently, when SDK worker containers fail to start up properly, an exception 
> is thrown that provides no information about what happened. We can improve 
> debugging by keeping containers around long enough to log their logs before 
> removing them.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Updated] (BEAM-8027) Add Spark Python wordcount test

2019-08-21 Thread Kyle Weaver (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kyle Weaver updated BEAM-8027:
--
Description: 
We are not catching bugs such as BEAM-7864 because we are not running Python 
wordcount test on Spark.

We added these tests for Flink in [https://github.com/apache/beam/pull/8745]

  was:We are not catching bugs such as BEAM-7864 because we are not running 
Python wordcount test on Spark.


> Add Spark Python wordcount test
> ---
>
> Key: BEAM-8027
> URL: https://issues.apache.org/jira/browse/BEAM-8027
> Project: Beam
>  Issue Type: Test
>  Components: runner-spark
>Reporter: Kyle Weaver
>Assignee: Kyle Weaver
>Priority: Major
>
> We are not catching bugs such as BEAM-7864 because we are not running Python 
> wordcount test on Spark.
> We added these tests for Flink in [https://github.com/apache/beam/pull/8745]



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-6858) Support side inputs injected into a DoFn

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-6858?focusedWorklogId=299018=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299018
 ]

ASF GitHub Bot logged work on BEAM-6858:


Author: ASF GitHub Bot
Created on: 21/Aug/19 20:59
Start Date: 21/Aug/19 20:59
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on pull request #9275: [BEAM-6858] 
Support side inputs injected into a DoFn
URL: https://github.com/apache/beam/pull/9275#discussion_r316398723
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
 ##
 @@ -556,6 +573,30 @@ public static TimerParameter 
timerParameter(TimerDeclaration decl) {
   TimeDomainParameter() {}
 }
 
+/** Descriptor for a {@link Parameter} of type {@link DoFn.SideInput}. */
+@AutoValue
+public abstract static class SideInputParameter extends Parameter {
+  SideInputParameter() {}
+
+  public abstract TypeDescriptor elementT();
+
+  @Nullable
+  public abstract String fieldAccessString();
 
 Review comment:
   shouldn't be called fieldAccessString. Maybe sideInputId? Also shouldn't be 
nullable.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299018)
Time Spent: 5h  (was: 4h 50m)

> Support side inputs injected into a DoFn
> 
>
> Key: BEAM-6858
> URL: https://issues.apache.org/jira/browse/BEAM-6858
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Reporter: Reuven Lax
>Assignee: Shehzaad Nakhoda
>Priority: Major
>  Time Spent: 5h
>  Remaining Estimate: 0h
>
> Beam currently supports injecting main inputs into a DoFn process method. A 
> user can write the following:
> @ProcessElement public void process(@Element InputT element)
> And Beam will (using ByteBuddy code generation) inject the input element into 
> the process method.
> We would like to also support the same for side inputs. For example:
> @ProcessElement public void process(@Element InputT element, 
> @SideInput("tag1") String input1, @SideInput("tag2") Integer input2) 
> This requires the existing process-method analysis framework to capture these 
> side inputs. The ParDo code would have to verify the type of the side input 
> and include them in the list of side inputs. This would also eliminate the 
> need for the user to explicitly call withSideInputs on the ParDo.
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-6858) Support side inputs injected into a DoFn

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-6858?focusedWorklogId=299014=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299014
 ]

ASF GitHub Bot logged work on BEAM-6858:


Author: ASF GitHub Bot
Created on: 21/Aug/19 20:59
Start Date: 21/Aug/19 20:59
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on pull request #9275: [BEAM-6858] 
Support side inputs injected into a DoFn
URL: https://github.com/apache/beam/pull/9275#discussion_r316398068
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
 ##
 @@ -793,20 +805,27 @@ public String toString() {
   return withSideInputs(Arrays.asList(sideInputs));
 }
 
+public MultiOutput withSideInputs(
+Iterable> sideInputs) {
+  Map> mappedInputs =
+  StreamSupport.stream(sideInputs.spliterator(), false)
+  .collect(Collectors.toMap(v -> v.getTagInternal().getId(), v -> 
v));
+  return withSideInputs(mappedInputs);
+}
 
 Review comment:
   We should support builder-style as well. e.g.
   
   ParDo
   .withSideInput("tag1", pcv1)
   .withSideInput("tag2", pcv2);
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299014)
Time Spent: 4h 50m  (was: 4h 40m)

> Support side inputs injected into a DoFn
> 
>
> Key: BEAM-6858
> URL: https://issues.apache.org/jira/browse/BEAM-6858
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Reporter: Reuven Lax
>Assignee: Shehzaad Nakhoda
>Priority: Major
>  Time Spent: 4h 50m
>  Remaining Estimate: 0h
>
> Beam currently supports injecting main inputs into a DoFn process method. A 
> user can write the following:
> @ProcessElement public void process(@Element InputT element)
> And Beam will (using ByteBuddy code generation) inject the input element into 
> the process method.
> We would like to also support the same for side inputs. For example:
> @ProcessElement public void process(@Element InputT element, 
> @SideInput("tag1") String input1, @SideInput("tag2") Integer input2) 
> This requires the existing process-method analysis framework to capture these 
> side inputs. The ParDo code would have to verify the type of the side input 
> and include them in the list of side inputs. This would also eliminate the 
> need for the user to explicitly call withSideInputs on the ParDo.
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-6858) Support side inputs injected into a DoFn

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-6858?focusedWorklogId=299015=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299015
 ]

ASF GitHub Bot logged work on BEAM-6858:


Author: ASF GitHub Bot
Created on: 21/Aug/19 20:59
Start Date: 21/Aug/19 20:59
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on pull request #9275: [BEAM-6858] 
Support side inputs injected into a DoFn
URL: https://github.com/apache/beam/pull/9275#discussion_r316397964
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
 ##
 @@ -660,11 +659,25 @@ public static DoFnSchemaInformation 
getDoFnSchemaInformation(
  */
 public SingleOutput withSideInputs(
 Iterable> sideInputs) {
+  Map> mappedInputs =
+  StreamSupport.stream(sideInputs.spliterator(), false)
+  .collect(Collectors.toMap(v -> v.getTagInternal().getId(), v -> 
v));
+  return withSideInputs(mappedInputs);
+}
+
+/**
+ * Returns a new {@link ParDo} {@link PTransform} that's like this {@link 
PTransform} but with
+ * the specified additional side inputs. Does not modify this {@link 
PTransform}.
+ *
+ * See the discussion of Side Inputs above for more explanation.
+ */
+public SingleOutput withSideInputs(
+Map> sideInputs) {
   return new SingleOutput<>(
   fn,
-  ImmutableList.>builder()
-  .addAll(this.sideInputs)
-  .addAll(sideInputs)
+  ImmutableMap.>builder()
+  .putAll(this.sideInputs)
+  .putAll(sideInputs)
   .build(),
   fnDisplayData);
 }
 
 Review comment:
   We should support builder-style as well. e.g.
   
   ParDo
   .withSideInput("tag1", pcv1)
   .withSideInput("tag2", pcv2);
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299015)
Time Spent: 4h 50m  (was: 4h 40m)

> Support side inputs injected into a DoFn
> 
>
> Key: BEAM-6858
> URL: https://issues.apache.org/jira/browse/BEAM-6858
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Reporter: Reuven Lax
>Assignee: Shehzaad Nakhoda
>Priority: Major
>  Time Spent: 4h 50m
>  Remaining Estimate: 0h
>
> Beam currently supports injecting main inputs into a DoFn process method. A 
> user can write the following:
> @ProcessElement public void process(@Element InputT element)
> And Beam will (using ByteBuddy code generation) inject the input element into 
> the process method.
> We would like to also support the same for side inputs. For example:
> @ProcessElement public void process(@Element InputT element, 
> @SideInput("tag1") String input1, @SideInput("tag2") Integer input2) 
> This requires the existing process-method analysis framework to capture these 
> side inputs. The ParDo code would have to verify the type of the side input 
> and include them in the list of side inputs. This would also eliminate the 
> need for the user to explicitly call withSideInputs on the ParDo.
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-6858) Support side inputs injected into a DoFn

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-6858?focusedWorklogId=299013=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299013
 ]

ASF GitHub Bot logged work on BEAM-6858:


Author: ASF GitHub Bot
Created on: 21/Aug/19 20:59
Start Date: 21/Aug/19 20:59
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on pull request #9275: [BEAM-6858] 
Support side inputs injected into a DoFn
URL: https://github.com/apache/beam/pull/9275#discussion_r316397196
 
 

 ##
 File path: 
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java
 ##
 @@ -177,10 +177,7 @@ public String getUrn(ParDoSingle transform) {
   // TODO: Is there a better way to do this?
   Set allInputs =
   
transform.getInputs().keySet().stream().map(TupleTag::getId).collect(Collectors.toSet());
-  Set sideInputs =
-  parDo.getSideInputs().stream()
-  .map(s -> s.getTagInternal().getId())
-  .collect(Collectors.toSet());
+  Set sideInputs = parDo.getSideInputs().keySet();
 
 Review comment:
   I think this is wrong - we should still get this Set from the internal tag 
like before
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299013)
Time Spent: 4h 40m  (was: 4.5h)

> Support side inputs injected into a DoFn
> 
>
> Key: BEAM-6858
> URL: https://issues.apache.org/jira/browse/BEAM-6858
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Reporter: Reuven Lax
>Assignee: Shehzaad Nakhoda
>Priority: Major
>  Time Spent: 4h 40m
>  Remaining Estimate: 0h
>
> Beam currently supports injecting main inputs into a DoFn process method. A 
> user can write the following:
> @ProcessElement public void process(@Element InputT element)
> And Beam will (using ByteBuddy code generation) inject the input element into 
> the process method.
> We would like to also support the same for side inputs. For example:
> @ProcessElement public void process(@Element InputT element, 
> @SideInput("tag1") String input1, @SideInput("tag2") Integer input2) 
> This requires the existing process-method analysis framework to capture these 
> side inputs. The ParDo code would have to verify the type of the side input 
> and include them in the list of side inputs. This would also eliminate the 
> need for the user to explicitly call withSideInputs on the ParDo.
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-6858) Support side inputs injected into a DoFn

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-6858?focusedWorklogId=299017=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299017
 ]

ASF GitHub Bot logged work on BEAM-6858:


Author: ASF GitHub Bot
Created on: 21/Aug/19 20:59
Start Date: 21/Aug/19 20:59
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on pull request #9275: [BEAM-6858] 
Support side inputs injected into a DoFn
URL: https://github.com/apache/beam/pull/9275#discussion_r316399743
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
 ##
 @@ -24,8 +24,9 @@
 import java.lang.reflect.Type;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.List;
 
 Review comment:
   We need to validate that the types of the parameters match the types of the 
side input. Possibly we should do this in the validate() method - there we can 
look up the elementT of each parameter and make sure it matches the elementT of 
the PCollectionView.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299017)
Time Spent: 5h  (was: 4h 50m)

> Support side inputs injected into a DoFn
> 
>
> Key: BEAM-6858
> URL: https://issues.apache.org/jira/browse/BEAM-6858
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Reporter: Reuven Lax
>Assignee: Shehzaad Nakhoda
>Priority: Major
>  Time Spent: 5h
>  Remaining Estimate: 0h
>
> Beam currently supports injecting main inputs into a DoFn process method. A 
> user can write the following:
> @ProcessElement public void process(@Element InputT element)
> And Beam will (using ByteBuddy code generation) inject the input element into 
> the process method.
> We would like to also support the same for side inputs. For example:
> @ProcessElement public void process(@Element InputT element, 
> @SideInput("tag1") String input1, @SideInput("tag2") Integer input2) 
> This requires the existing process-method analysis framework to capture these 
> side inputs. The ParDo code would have to verify the type of the side input 
> and include them in the list of side inputs. This would also eliminate the 
> need for the user to explicitly call withSideInputs on the ParDo.
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-6858) Support side inputs injected into a DoFn

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-6858?focusedWorklogId=299016=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299016
 ]

ASF GitHub Bot logged work on BEAM-6858:


Author: ASF GitHub Bot
Created on: 21/Aug/19 20:59
Start Date: 21/Aug/19 20:59
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on pull request #9275: [BEAM-6858] 
Support side inputs injected into a DoFn
URL: https://github.com/apache/beam/pull/9275#discussion_r316398560
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
 ##
 @@ -413,6 +422,14 @@ public static TimestampParameter timestampParameter() {
   return TIMESTAMP_PARAMETER;
 }
 
+public static SideInputParameter sideInputParameter(
+TypeDescriptor elementT, @Nullable String fieldAccessString) {
 
 Review comment:
   shouldn't be called fieldAccessString. Maybe sideInputId?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299016)
Time Spent: 5h  (was: 4h 50m)

> Support side inputs injected into a DoFn
> 
>
> Key: BEAM-6858
> URL: https://issues.apache.org/jira/browse/BEAM-6858
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Reporter: Reuven Lax
>Assignee: Shehzaad Nakhoda
>Priority: Major
>  Time Spent: 5h
>  Remaining Estimate: 0h
>
> Beam currently supports injecting main inputs into a DoFn process method. A 
> user can write the following:
> @ProcessElement public void process(@Element InputT element)
> And Beam will (using ByteBuddy code generation) inject the input element into 
> the process method.
> We would like to also support the same for side inputs. For example:
> @ProcessElement public void process(@Element InputT element, 
> @SideInput("tag1") String input1, @SideInput("tag2") Integer input2) 
> This requires the existing process-method analysis framework to capture these 
> side inputs. The ParDo code would have to verify the type of the side input 
> and include them in the list of side inputs. This would also eliminate the 
> need for the user to explicitly call withSideInputs on the ParDo.
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (BEAM-8027) Add Spark Python wordcount test

2019-08-21 Thread Kyle Weaver (Jira)
Kyle Weaver created BEAM-8027:
-

 Summary: Add Spark Python wordcount test
 Key: BEAM-8027
 URL: https://issues.apache.org/jira/browse/BEAM-8027
 Project: Beam
  Issue Type: Test
  Components: runner-spark
Reporter: Kyle Weaver
Assignee: Kyle Weaver


We are not catching bugs such as BEAM-7864 because we are not running Python 
wordcount test on Spark.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7742) BigQuery File Loads to work well with load job size limits

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7742?focusedWorklogId=299011=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299011
 ]

ASF GitHub Bot logged work on BEAM-7742:


Author: ASF GitHub Bot
Created on: 21/Aug/19 20:55
Start Date: 21/Aug/19 20:55
Worklog Time Spent: 10m 
  Work Description: ttanay commented on pull request #9242: [BEAM-7742] 
Partition files in BQFL to cater to quotas & limits
URL: https://github.com/apache/beam/pull/9242#discussion_r316398364
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
 ##
 @@ -244,19 +252,22 @@ def process(self, element, file_prefix):
 destination = element[0]
 rows = element[1]
 
-writer = None
+file_path, writer = None, None
 
 for row in rows:
   if writer is None:
 (file_path, writer) = _make_new_file_writer(file_prefix, destination)
-yield (destination, file_path)
 
   writer.write(self.coder.encode(row))
   writer.write(b'\n')
 
-  if writer.tell() > self.max_file_size:
+  file_size = writer.tell()
+  if file_size > self.max_file_size:
 writer.close()
-writer = None
+yield (destination, (file_path, file_size))
+file_path, writer = None, None
+if writer is not None:
+  yield (destination, (file_path, file_size))
 
 Review comment:
   Done
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299011)
Time Spent: 3h 50m  (was: 3h 40m)

> BigQuery File Loads to work well with load job size limits
> --
>
> Key: BEAM-7742
> URL: https://issues.apache.org/jira/browse/BEAM-7742
> Project: Beam
>  Issue Type: Improvement
>  Components: io-py-gcp
>Reporter: Pablo Estrada
>Assignee: Tanay Tummalapalli
>Priority: Major
>  Time Spent: 3h 50m
>  Remaining Estimate: 0h
>
> Load jobs into BigQuery have a number of limitations: 
> [https://cloud.google.com/bigquery/quotas#load_jobs]
>  
> Currently, the python BQ sink implemented in `bigquery_file_loads.py` does 
> not handle these limitations well. Improvements need to be made to the 
> miplementation, to:
>  * Decide to use temp_tables dynamically at pipeline execution
>  * Add code to determine when a load job to a single destination needs to be 
> partitioned into multiple jobs.
>  * When this happens, then we definitely need to use temp_tables, in case one 
> of the two load jobs fails, and the pipeline is rerun.
> Tanay, would you be able to look at this?



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7742) BigQuery File Loads to work well with load job size limits

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7742?focusedWorklogId=299009=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299009
 ]

ASF GitHub Bot logged work on BEAM-7742:


Author: ASF GitHub Bot
Created on: 21/Aug/19 20:55
Start Date: 21/Aug/19 20:55
Worklog Time Spent: 10m 
  Work Description: ttanay commented on issue #9242: [BEAM-7742] Partition 
files in BQFL to cater to quotas & limits
URL: https://github.com/apache/beam/pull/9242#issuecomment-523645356
 
 
   All checks pass :heavy_check_mark: 
   @pabloem PTAL
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299009)
Time Spent: 3h 40m  (was: 3.5h)

> BigQuery File Loads to work well with load job size limits
> --
>
> Key: BEAM-7742
> URL: https://issues.apache.org/jira/browse/BEAM-7742
> Project: Beam
>  Issue Type: Improvement
>  Components: io-py-gcp
>Reporter: Pablo Estrada
>Assignee: Tanay Tummalapalli
>Priority: Major
>  Time Spent: 3h 40m
>  Remaining Estimate: 0h
>
> Load jobs into BigQuery have a number of limitations: 
> [https://cloud.google.com/bigquery/quotas#load_jobs]
>  
> Currently, the python BQ sink implemented in `bigquery_file_loads.py` does 
> not handle these limitations well. Improvements need to be made to the 
> miplementation, to:
>  * Decide to use temp_tables dynamically at pipeline execution
>  * Add code to determine when a load job to a single destination needs to be 
> partitioned into multiple jobs.
>  * When this happens, then we definitely need to use temp_tables, in case one 
> of the two load jobs fails, and the pipeline is rerun.
> Tanay, would you be able to look at this?



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-6858) Support side inputs injected into a DoFn

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-6858?focusedWorklogId=299008=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299008
 ]

ASF GitHub Bot logged work on BEAM-6858:


Author: ASF GitHub Bot
Created on: 21/Aug/19 20:54
Start Date: 21/Aug/19 20:54
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on pull request #9275: [BEAM-6858] 
Support side inputs injected into a DoFn
URL: https://github.com/apache/beam/pull/9275#discussion_r316366065
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
 ##
 @@ -660,11 +659,25 @@ public static DoFnSchemaInformation 
getDoFnSchemaInformation(
  */
 public SingleOutput withSideInputs(
 Iterable> sideInputs) {
+  Map> mappedInputs =
+  StreamSupport.stream(sideInputs.spliterator(), false)
+  .collect(Collectors.toMap(v -> v.getTagInternal().getId(), v -> 
v));
+  return withSideInputs(mappedInputs);
+}
+
+/**
+ * Returns a new {@link ParDo} {@link PTransform} that's like this {@link 
PTransform} but with
+ * the specified additional side inputs. Does not modify this {@link 
PTransform}.
+ *
+ * See the discussion of Side Inputs above for more explanation.
+ */
+public SingleOutput withSideInputs(
+Map> sideInputs) {
   return new SingleOutput<>(
   fn,
-  ImmutableList.>builder()
-  .addAll(this.sideInputs)
-  .addAll(sideInputs)
+  ImmutableMap.>builder()
+  .putAll(this.sideInputs)
+  .putAll(sideInputs)
   .build(),
   fnDisplayData);
 }
 
 Review comment:
   We should support builder-style as well. e.g.
   
   ParDo
  .withSideInput("tag1", pcv1)
  .withSideInput("tag2", pcv2);
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299008)
Time Spent: 4.5h  (was: 4h 20m)

> Support side inputs injected into a DoFn
> 
>
> Key: BEAM-6858
> URL: https://issues.apache.org/jira/browse/BEAM-6858
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Reporter: Reuven Lax
>Assignee: Shehzaad Nakhoda
>Priority: Major
>  Time Spent: 4.5h
>  Remaining Estimate: 0h
>
> Beam currently supports injecting main inputs into a DoFn process method. A 
> user can write the following:
> @ProcessElement public void process(@Element InputT element)
> And Beam will (using ByteBuddy code generation) inject the input element into 
> the process method.
> We would like to also support the same for side inputs. For example:
> @ProcessElement public void process(@Element InputT element, 
> @SideInput("tag1") String input1, @SideInput("tag2") Integer input2) 
> This requires the existing process-method analysis framework to capture these 
> side inputs. The ParDo code would have to verify the type of the side input 
> and include them in the list of side inputs. This would also eliminate the 
> need for the user to explicitly call withSideInputs on the ParDo.
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-6858) Support side inputs injected into a DoFn

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-6858?focusedWorklogId=299005=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299005
 ]

ASF GitHub Bot logged work on BEAM-6858:


Author: ASF GitHub Bot
Created on: 21/Aug/19 20:52
Start Date: 21/Aug/19 20:52
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on pull request #9275: [BEAM-6858] 
Support side inputs injected into a DoFn
URL: https://github.com/apache/beam/pull/9275#discussion_r316396979
 
 

 ##
 File path: 
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java
 ##
 @@ -177,10 +177,7 @@ public String getUrn(ParDoSingle transform) {
   // TODO: Is there a better way to do this?
   Set allInputs =
   
transform.getInputs().keySet().stream().map(TupleTag::getId).collect(Collectors.toSet());
-  Set sideInputs =
-  parDo.getSideInputs().stream()
-  .map(s -> s.getTagInternal().getId())
-  .collect(Collectors.toSet());
+  Set sideInputs = parDo.getSideInputs().keySet();
 
 Review comment:
   I think this is wrong - we should still get this Set from getTagInternal 
like before.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299005)
Time Spent: 4h 10m  (was: 4h)

> Support side inputs injected into a DoFn
> 
>
> Key: BEAM-6858
> URL: https://issues.apache.org/jira/browse/BEAM-6858
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Reporter: Reuven Lax
>Assignee: Shehzaad Nakhoda
>Priority: Major
>  Time Spent: 4h 10m
>  Remaining Estimate: 0h
>
> Beam currently supports injecting main inputs into a DoFn process method. A 
> user can write the following:
> @ProcessElement public void process(@Element InputT element)
> And Beam will (using ByteBuddy code generation) inject the input element into 
> the process method.
> We would like to also support the same for side inputs. For example:
> @ProcessElement public void process(@Element InputT element, 
> @SideInput("tag1") String input1, @SideInput("tag2") Integer input2) 
> This requires the existing process-method analysis framework to capture these 
> side inputs. The ParDo code would have to verify the type of the side input 
> and include them in the list of side inputs. This would also eliminate the 
> need for the user to explicitly call withSideInputs on the ParDo.
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-6858) Support side inputs injected into a DoFn

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-6858?focusedWorklogId=299006=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299006
 ]

ASF GitHub Bot logged work on BEAM-6858:


Author: ASF GitHub Bot
Created on: 21/Aug/19 20:52
Start Date: 21/Aug/19 20:52
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on pull request #9275: [BEAM-6858] 
Support side inputs injected into a DoFn
URL: https://github.com/apache/beam/pull/9275#discussion_r316396979
 
 

 ##
 File path: 
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java
 ##
 @@ -177,10 +177,7 @@ public String getUrn(ParDoSingle transform) {
   // TODO: Is there a better way to do this?
   Set allInputs =
   
transform.getInputs().keySet().stream().map(TupleTag::getId).collect(Collectors.toSet());
-  Set sideInputs =
-  parDo.getSideInputs().stream()
-  .map(s -> s.getTagInternal().getId())
-  .collect(Collectors.toSet());
+  Set sideInputs = parDo.getSideInputs().keySet();
 
 Review comment:
   I think this is wrong - we should still get this Set from getTagInternal 
like before.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299006)
Time Spent: 4h 20m  (was: 4h 10m)

> Support side inputs injected into a DoFn
> 
>
> Key: BEAM-6858
> URL: https://issues.apache.org/jira/browse/BEAM-6858
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Reporter: Reuven Lax
>Assignee: Shehzaad Nakhoda
>Priority: Major
>  Time Spent: 4h 20m
>  Remaining Estimate: 0h
>
> Beam currently supports injecting main inputs into a DoFn process method. A 
> user can write the following:
> @ProcessElement public void process(@Element InputT element)
> And Beam will (using ByteBuddy code generation) inject the input element into 
> the process method.
> We would like to also support the same for side inputs. For example:
> @ProcessElement public void process(@Element InputT element, 
> @SideInput("tag1") String input1, @SideInput("tag2") Integer input2) 
> This requires the existing process-method analysis framework to capture these 
> side inputs. The ParDo code would have to verify the type of the side input 
> and include them in the list of side inputs. This would also eliminate the 
> need for the user to explicitly call withSideInputs on the ParDo.
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7013) A new count distinct transform based on BigQuery compatible HyperLogLog++ implementation

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7013?focusedWorklogId=299003=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299003
 ]

ASF GitHub Bot logged work on BEAM-7013:


Author: ASF GitHub Bot
Created on: 21/Aug/19 20:50
Start Date: 21/Aug/19 20:50
Worklog Time Spent: 10m 
  Work Description: boyuanzz commented on pull request #9144: [BEAM-7013] 
Integrating ZetaSketch's HLL++ algorithm with Beam
URL: https://github.com/apache/beam/pull/9144#discussion_r316396289
 
 

 ##
 File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/BigqueryMatcher.java
 ##
 @@ -63,32 +63,52 @@
 
   private final String projectId;
   private final String query;
+  private final boolean usingStandardSql;
 
 Review comment:
   Thanks for the pointer! Then maybe we should setup `usingLegacySql` instead 
of `usingStandardSql` to keep consistent with bq model. wdyt?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299003)
Time Spent: 25h 40m  (was: 25.5h)

> A new count distinct transform based on BigQuery compatible HyperLogLog++ 
> implementation
> 
>
> Key: BEAM-7013
> URL: https://issues.apache.org/jira/browse/BEAM-7013
> Project: Beam
>  Issue Type: New Feature
>  Components: extensions-java-sketching, sdk-java-core
>Reporter: Yueyang Qiu
>Assignee: Yueyang Qiu
>Priority: Major
> Fix For: 2.16.0
>
>  Time Spent: 25h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=299002=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299002
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 21/Aug/19 20:50
Start Date: 21/Aug/19 20:50
Worklog Time Spent: 10m 
  Work Description: tweise commented on pull request #9371: [BEAM-7980] 
External environment with containerized worker pool
URL: https://github.com/apache/beam/pull/9371#discussion_r316396047
 
 

 ##
 File path: sdks/python/apache_beam/runners/worker/worker_pool_main.py
 ##
 @@ -0,0 +1,177 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""
+Worker pool entry point.
+
+The worker pool exposes an RPC service that is used with EXTERNAL
+environment to start and stop the SDK workers.
+
+The worker pool uses child processes for parallelism; threads are
 
 Review comment:
   The documentation applies to the entry point, which always uses processes. 
It's OK if tests use the inner class separately, it should have its own 
documentation (I did not modify that part, just moved it).
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299002)
Time Spent: 4.5h  (was: 4h 20m)

> External environment with containerized worker pool
> ---
>
> Key: BEAM-7980
> URL: https://issues.apache.org/jira/browse/BEAM-7980
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Time Spent: 4.5h
>  Remaining Estimate: 0h
>
> Augment Beam Python docker image and boot.go so that it can be used to launch 
> BeamFnExternalWorkerPoolServicer.
> [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0]
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-6858) Support side inputs injected into a DoFn

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-6858?focusedWorklogId=299001=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299001
 ]

ASF GitHub Bot logged work on BEAM-6858:


Author: ASF GitHub Bot
Created on: 21/Aug/19 20:49
Start Date: 21/Aug/19 20:49
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on pull request #9275: [BEAM-6858] 
Support side inputs injected into a DoFn
URL: https://github.com/apache/beam/pull/9275#discussion_r316395715
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
 ##
 @@ -208,8 +208,15 @@ public static ParDoPayload translateParDo(
 new ParDoLike() {
   @Override
   public SdkFunctionSpec translateDoFn(SdkComponents newComponents) {
+Map> sideInputMapping =
+parDo.getSideInputs().entrySet().stream()
+.collect(Collectors.toMap(e -> e.getKey(), e -> 
e.getValue()));
 
 Review comment:
   I believe that this collector is a noop - it will just create the same map 
as the input (I think this might be a remnant of my code).
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299001)
Time Spent: 4h  (was: 3h 50m)

> Support side inputs injected into a DoFn
> 
>
> Key: BEAM-6858
> URL: https://issues.apache.org/jira/browse/BEAM-6858
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Reporter: Reuven Lax
>Assignee: Shehzaad Nakhoda
>Priority: Major
>  Time Spent: 4h
>  Remaining Estimate: 0h
>
> Beam currently supports injecting main inputs into a DoFn process method. A 
> user can write the following:
> @ProcessElement public void process(@Element InputT element)
> And Beam will (using ByteBuddy code generation) inject the input element into 
> the process method.
> We would like to also support the same for side inputs. For example:
> @ProcessElement public void process(@Element InputT element, 
> @SideInput("tag1") String input1, @SideInput("tag2") Integer input2) 
> This requires the existing process-method analysis framework to capture these 
> side inputs. The ParDo code would have to verify the type of the side input 
> and include them in the list of side inputs. This would also eliminate the 
> need for the user to explicitly call withSideInputs on the ParDo.
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7013) A new count distinct transform based on BigQuery compatible HyperLogLog++ implementation

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7013?focusedWorklogId=299000=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299000
 ]

ASF GitHub Bot logged work on BEAM-7013:


Author: ASF GitHub Bot
Created on: 21/Aug/19 20:46
Start Date: 21/Aug/19 20:46
Worklog Time Spent: 10m 
  Work Description: boyuanzz commented on pull request #9144: [BEAM-7013] 
Integrating ZetaSketch's HLL++ algorithm with Beam
URL: https://github.com/apache/beam/pull/9144#discussion_r316394576
 
 

 ##
 File path: 
sdks/java/extensions/zetasketch/src/test/java/org/apache/beam/sdk/extensions/zetasketch/BigQueryHllSketchCompatibilityIT.java
 ##
 @@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.zetasketch;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
+import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
+import org.apache.beam.sdk.io.gcp.testing.BigqueryMatcher;
+import org.apache.beam.sdk.options.ApplicationNameOptions;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Integration tests for HLL++ sketch compatibility between Beam and BigQuery. 
The tests verifies
+ * that HLL++ sketches created in Beam can be processed by BigQuery, and vice 
versa.
+ */
+@RunWith(JUnit4.class)
+public class BigQueryHllSketchCompatibilityIT {
+
+  private static final String DATASET_NAME = "zetasketch_compatibility_test";
+
+  // Table for testReadSketchFromBigQuery()
+  // Schema: only one STRING field named "data".
+  // Content: prepopulated with 4 rows: "Apple", "Orange", "Banana", "Orange"
+  private static final String DATA_TABLE_NAME = "hll_data";
+  private static final String DATA_FIELD_NAME = "data";
+  private static final String QUERY_RESULT_FIELD_NAME = "sketch";
+  private static final Long EXPECTED_COUNT = 3L;
+
+  // Table for testWriteSketchToBigQuery()
+  // Schema: only one BYTES field named "sketch".
+  // Content: will be overridden by the sketch computed by the test pipeline 
each time the test runs
+  private static final String SKETCH_TABLE_NAME = "hll_sketch";
+  private static final String SKETCH_FIELD_NAME = "sketch";
+  private static final List TEST_DATA =
+  Arrays.asList("Apple", "Orange", "Banana", "Orange");
+  // SHA-1 hash of string "[3]", the string representation of a row that has 
only one field 3 in it
+  private static final String EXPECTED_CHECKSUM = 
"f1e31df9806ce94c5bdbbfff9608324930f4d3f1";
+
+  /**
+   * Test that HLL++ sketch computed in BigQuery can be processed by Beam. Hll 
sketch is computed by
+   * {@code HLL_COUNT.INIT} in BigQuery and read into Beam; the test verifies 
that we can run {@link
+   * HllCount.MergePartial} and {@link HllCount.Extract} on the sketch in Beam 
to get the correct
+   * estimated count.
+   */
+  @Test
+  public void testReadSketchFromBigQuery() {
 
 Review comment:
   Any reason choosing to create test data manually? IMO, it would make 
operations harder under certain scenarios. For example, our infra team decides 
to using a project to run all ITs, then your test will be broken. Instead, how 
about creating your test data in `@BeforeClass` and deleting all data in 
`@AfterClass`?
 

[jira] [Work logged] (BEAM-7886) Make row coder a standard coder and implement in python

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7886?focusedWorklogId=298998=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-298998
 ]

ASF GitHub Bot logged work on BEAM-7886:


Author: ASF GitHub Bot
Created on: 21/Aug/19 20:44
Start Date: 21/Aug/19 20:44
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on pull request #9188: [BEAM-7886] 
Make row coder a standard coder and implement in Python
URL: https://github.com/apache/beam/pull/9188#discussion_r316393831
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/PortableSchemaCoder.java
 ##
 @@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas;
+
+import org.apache.beam.sdk.transforms.SerializableFunctions;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * A version of SchemaCoder that can only produce/consume Row instances.
+ *
+ * Implements the beam:coders:row:v1 standard coder while still satisfying 
the requirement that a
+ * PCollection is only considered to have a schema if its coder is an instance 
of SchemaCoder.
+ */
+public class PortableSchemaCoder extends SchemaCoder {
 
 Review comment:
   is the reason you need a separate class here because you need a unique Class 
object for the map?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 298998)
Time Spent: 6h 20m  (was: 6h 10m)

> Make row coder a standard coder and implement in python
> ---
>
> Key: BEAM-7886
> URL: https://issues.apache.org/jira/browse/BEAM-7886
> Project: Beam
>  Issue Type: Improvement
>  Components: beam-model, sdk-java-core, sdk-py-core
>Reporter: Brian Hulette
>Assignee: Brian Hulette
>Priority: Major
>  Time Spent: 6h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7886) Make row coder a standard coder and implement in python

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7886?focusedWorklogId=298997=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-298997
 ]

ASF GitHub Bot logged work on BEAM-7886:


Author: ASF GitHub Bot
Created on: 21/Aug/19 20:42
Start Date: 21/Aug/19 20:42
Worklog Time Spent: 10m 
  Work Description: TheNeuralBit commented on pull request #9188: 
[BEAM-7886] Make row coder a standard coder and implement in Python
URL: https://github.com/apache/beam/pull/9188#discussion_r316393193
 
 

 ##
 File path: sdks/python/apache_beam/typehints/schemas.py
 ##
 @@ -0,0 +1,217 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+""" Support for mapping python types to proto Schemas and back again.
+
+Python  Schema
+np.int8 <-> BYTE
+np.int16<-> INT16
+np.int32<-> INT32
+np.int64<-> INT64
+int ---/
+np.float32  <-> FLOAT
+np.float64  <-> DOUBLE
+float   ---/
+bool<-> BOOLEAN
+
+The mappings for STRING and BYTES are different between python 2 and python 3,
+because of the changes to str:
+py3:
+str/unicode <-> STRING
+bytes   <-> BYTES
+ByteString  ---/
+
+py2:
+unicode <-> STRING
+str/bytes   ---/
+ByteString  <-> BYTES
+"""
+
+from __future__ import absolute_import
+
+import sys
+from typing import ByteString
+from typing import Mapping
+from typing import NamedTuple
+from typing import Optional
+from typing import Sequence
+from uuid import uuid4
+
+import numpy as np
+from past.builtins import unicode
+
+from apache_beam.portability.api import schema_pb2
+from apache_beam.typehints.native_type_compatibility import _get_args
+from apache_beam.typehints.native_type_compatibility import 
_match_is_exactly_mapping
+from apache_beam.typehints.native_type_compatibility import 
_match_is_named_tuple
+from apache_beam.typehints.native_type_compatibility import _match_is_optional
+from apache_beam.typehints.native_type_compatibility import _safe_issubclass
+from apache_beam.typehints.native_type_compatibility import 
extract_optional_type
+
+
+# Registry of typings for a schema by UUID
+class SchemaTypeRegistry(object):
+  def __init__(self):
+self.by_id = {}
+self.by_typing = {}
+
+  def add(self, typing, schema):
+self.by_id[schema.id] = (typing, schema)
+
+  def get_typing_by_id(self, unique_id):
+result = self.by_id.get(unique_id, None)
+return result[0] if result is not None else None
+
+  def get_schema_by_id(self, unique_id):
+result = self.by_id.get(unique_id, None)
+return result[1] if result is not None else None
+
+
+SCHEMA_REGISTRY = SchemaTypeRegistry()
+
+
+# Bi-directional mappings
+_PRIMITIVES = (
+(np.int8, schema_pb2.AtomicType.BYTE),
+(np.int16, schema_pb2.AtomicType.INT16),
+(np.int32, schema_pb2.AtomicType.INT32),
+(np.int64, schema_pb2.AtomicType.INT64),
+(np.float32, schema_pb2.AtomicType.FLOAT),
+(np.float64, schema_pb2.AtomicType.DOUBLE),
+(unicode, schema_pb2.AtomicType.STRING),
+(bool, schema_pb2.AtomicType.BOOLEAN),
+(bytes if sys.version_info.major >= 3 else ByteString,
+ schema_pb2.AtomicType.BYTES),
+)
+
+PRIMITIVE_TO_ATOMIC_TYPE = dict((typ, atomic) for typ, atomic in _PRIMITIVES)
+ATOMIC_TYPE_TO_PRIMITIVE = dict((atomic, typ) for typ, atomic in _PRIMITIVES)
+
+# One-way mappings
+PRIMITIVE_TO_ATOMIC_TYPE.update({
+# In python 3, this is a no-op because str == unicode,
+# but in python 2 it overrides the bytes -> BYTES mapping.
+str: schema_pb2.AtomicType.STRING,
 
 Review comment:
   So the runtime types don't necessarily line up exactly with the typing 
representation of the schema. For example even though a schema may have an 
attribute with `np.int*` type, we still actually produce and consume `int` 
instances, and never use `np.int*` instances at runtime.
   
   In this case, the typing might say `str`, but RowCoder uses StrUtf8Coder to 
produce/consume instances of `past.builtins.unicode` at runtime.
   
   I agree this could be a little confusing for users. We discussed it [on the 

[jira] [Work logged] (BEAM-7013) A new count distinct transform based on BigQuery compatible HyperLogLog++ implementation

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7013?focusedWorklogId=298996=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-298996
 ]

ASF GitHub Bot logged work on BEAM-7013:


Author: ASF GitHub Bot
Created on: 21/Aug/19 20:40
Start Date: 21/Aug/19 20:40
Worklog Time Spent: 10m 
  Work Description: boyuanzz commented on pull request #9144: [BEAM-7013] 
Integrating ZetaSketch's HLL++ algorithm with Beam
URL: https://github.com/apache/beam/pull/9144#discussion_r316392094
 
 

 ##
 File path: sdks/java/extensions/zetasketch/build.gradle
 ##
 @@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import groovy.json.JsonOutput
+
+plugins { id 'org.apache.beam.module' }
+applyJavaNature()
+
+description = "Apache Beam :: SDKs :: Java :: Extensions :: ZetaSketch"
+
+def zetasketch_version = "0.1.0"
+
+dependencies {
+compile library.java.vendored_guava_26_0_jre
+compile project(path: ":sdks:java:core", configuration: "shadow")
+compile "com.google.zetasketch:zetasketch:$zetasketch_version"
+testCompile library.java.junit
+testCompile project(":sdks:java:io:google-cloud-platform")
+testRuntimeOnly project(":runners:direct-java")
+testRuntimeOnly project(":runners:google-cloud-dataflow-java")
+}
+
+/**
+ * Integration tests running on Dataflow with BigQuery.
+ */
+task integrationTest(type: Test) {
+group = "Verification"
+def gcpProject = project.findProperty('gcpProject') ?: 
'apache-beam-testing'
+def gcpTempRoot = project.findProperty('gcpTempRoot') ?: 
'gs://temp-storage-for-end-to-end-tests'
+systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
+"--runner=TestDataflowRunner",
+"--project=${gcpProject}",
+"--tempRoot=${gcpTempRoot}",
+])
 
 Review comment:
   But you want to run our test with DataflowRunner right? Then if you want to 
always run with the worker head, you need to add 2 more cmd args like:
   
https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/build.gradle#L161.
 Otherwise, the prebuilt worker image is used.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 298996)
Time Spent: 25h 20m  (was: 25h 10m)

> A new count distinct transform based on BigQuery compatible HyperLogLog++ 
> implementation
> 
>
> Key: BEAM-7013
> URL: https://issues.apache.org/jira/browse/BEAM-7013
> Project: Beam
>  Issue Type: New Feature
>  Components: extensions-java-sketching, sdk-java-core
>Reporter: Yueyang Qiu
>Assignee: Yueyang Qiu
>Priority: Major
> Fix For: 2.16.0
>
>  Time Spent: 25h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7886) Make row coder a standard coder and implement in python

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7886?focusedWorklogId=298991=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-298991
 ]

ASF GitHub Bot logged work on BEAM-7886:


Author: ASF GitHub Bot
Created on: 21/Aug/19 20:31
Start Date: 21/Aug/19 20:31
Worklog Time Spent: 10m 
  Work Description: aaltay commented on pull request #9188: [BEAM-7886] 
Make row coder a standard coder and implement in Python
URL: https://github.com/apache/beam/pull/9188#discussion_r316388296
 
 

 ##
 File path: sdks/python/setup.py
 ##
 @@ -115,8 +115,7 @@ def get_version():
 'mock>=1.0.1,<3.0.0',
 'pymongo>=3.8.0,<4.0.0',
 'oauth2client>=2.0.1,<4',
-# grpcio 1.8.1 and above requires protobuf 3.5.0.post1.
-'protobuf>=3.5.0.post1,<4',
+'protobuf>=3.8.0.post1,<4',
 
 Review comment:
   If it would not be a difficult fix, or would not cause future issues, it is 
better to keep the version range as is.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 298991)
Time Spent: 6h  (was: 5h 50m)

> Make row coder a standard coder and implement in python
> ---
>
> Key: BEAM-7886
> URL: https://issues.apache.org/jira/browse/BEAM-7886
> Project: Beam
>  Issue Type: Improvement
>  Components: beam-model, sdk-java-core, sdk-py-core
>Reporter: Brian Hulette
>Assignee: Brian Hulette
>Priority: Major
>  Time Spent: 6h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7886) Make row coder a standard coder and implement in python

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7886?focusedWorklogId=298987=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-298987
 ]

ASF GitHub Bot logged work on BEAM-7886:


Author: ASF GitHub Bot
Created on: 21/Aug/19 20:28
Start Date: 21/Aug/19 20:28
Worklog Time Spent: 10m 
  Work Description: TheNeuralBit commented on pull request #9188: 
[BEAM-7886] Make row coder a standard coder and implement in Python
URL: https://github.com/apache/beam/pull/9188#discussion_r316387149
 
 

 ##
 File path: sdks/python/apache_beam/coders/row_coder_test.py
 ##
 @@ -0,0 +1,129 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from __future__ import absolute_import
+
+import logging
+import typing
+import unittest
+from itertools import chain
+
+import numpy as np
+from past.builtins import unicode
+
+from apache_beam.coders import RowCoder
+from apache_beam.coders.typecoders import registry as coders_registry
+from apache_beam.portability.api import schema_pb2
+from apache_beam.typehints.schemas import typing_to_runner_api
+
+Person = typing.NamedTuple("Person", [
+("name", unicode),
+("age", np.int32),
+("address", typing.Optional[unicode]),
+("aliases", typing.List[unicode]),
+])
+
+coders_registry.register_coder(Person, RowCoder)
 
 Review comment:
   Yes it is right now. I was hesitant to make RowCoder the default coder for 
any NamedTuple sub-class instance, and I thought this was a good way to make it 
opt in.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 298987)
Time Spent: 5h 40m  (was: 5.5h)

> Make row coder a standard coder and implement in python
> ---
>
> Key: BEAM-7886
> URL: https://issues.apache.org/jira/browse/BEAM-7886
> Project: Beam
>  Issue Type: Improvement
>  Components: beam-model, sdk-java-core, sdk-py-core
>Reporter: Brian Hulette
>Assignee: Brian Hulette
>Priority: Major
>  Time Spent: 5h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7886) Make row coder a standard coder and implement in python

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7886?focusedWorklogId=298988=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-298988
 ]

ASF GitHub Bot logged work on BEAM-7886:


Author: ASF GitHub Bot
Created on: 21/Aug/19 20:28
Start Date: 21/Aug/19 20:28
Worklog Time Spent: 10m 
  Work Description: TheNeuralBit commented on pull request #9188: 
[BEAM-7886] Make row coder a standard coder and implement in Python
URL: https://github.com/apache/beam/pull/9188#discussion_r316387149
 
 

 ##
 File path: sdks/python/apache_beam/coders/row_coder_test.py
 ##
 @@ -0,0 +1,129 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from __future__ import absolute_import
+
+import logging
+import typing
+import unittest
+from itertools import chain
+
+import numpy as np
+from past.builtins import unicode
+
+from apache_beam.coders import RowCoder
+from apache_beam.coders.typecoders import registry as coders_registry
+from apache_beam.portability.api import schema_pb2
+from apache_beam.typehints.schemas import typing_to_runner_api
+
+Person = typing.NamedTuple("Person", [
+("name", unicode),
+("age", np.int32),
+("address", typing.Optional[unicode]),
+("aliases", typing.List[unicode]),
+])
+
+coders_registry.register_coder(Person, RowCoder)
 
 Review comment:
   Yes it is right now. I was hesitant to make RowCoder the default coder for 
any NamedTuple sub-class, and I thought this was a good way to make it opt in.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 298988)
Time Spent: 5h 50m  (was: 5h 40m)

> Make row coder a standard coder and implement in python
> ---
>
> Key: BEAM-7886
> URL: https://issues.apache.org/jira/browse/BEAM-7886
> Project: Beam
>  Issue Type: Improvement
>  Components: beam-model, sdk-java-core, sdk-py-core
>Reporter: Brian Hulette
>Assignee: Brian Hulette
>Priority: Major
>  Time Spent: 5h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7886) Make row coder a standard coder and implement in python

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7886?focusedWorklogId=298986=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-298986
 ]

ASF GitHub Bot logged work on BEAM-7886:


Author: ASF GitHub Bot
Created on: 21/Aug/19 20:25
Start Date: 21/Aug/19 20:25
Worklog Time Spent: 10m 
  Work Description: TheNeuralBit commented on pull request #9188: 
[BEAM-7886] Make row coder a standard coder and implement in Python
URL: https://github.com/apache/beam/pull/9188#discussion_r316386018
 
 

 ##
 File path: sdks/python/setup.py
 ##
 @@ -115,8 +115,7 @@ def get_version():
 'mock>=1.0.1,<3.0.0',
 'pymongo>=3.8.0,<4.0.0',
 'oauth2client>=2.0.1,<4',
-# grpcio 1.8.1 and above requires protobuf 3.5.0.post1.
-'protobuf>=3.5.0.post1,<4',
+'protobuf>=3.8.0.post1,<4',
 
 Review comment:
   I bumped it because I used a 3.8.0 feature without realizing it (using 
`schema_pb2.AtomicType.STRING`, rather than `schema_pb2.STRING`). This is 
actually the bug I was referencing in 
https://lists.apache.org/thread.html/e174967bab6c224db40f66af74e7d07466812cc6b78189e55da88cb0@%3Cdev.beam.apache.org%3E
   
   If it's going to be a problem I can change it to back and update my code, 
bumping it to 3.8.0 was just the lazy fix

 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 298986)
Time Spent: 5.5h  (was: 5h 20m)

> Make row coder a standard coder and implement in python
> ---
>
> Key: BEAM-7886
> URL: https://issues.apache.org/jira/browse/BEAM-7886
> Project: Beam
>  Issue Type: Improvement
>  Components: beam-model, sdk-java-core, sdk-py-core
>Reporter: Brian Hulette
>Assignee: Brian Hulette
>Priority: Major
>  Time Spent: 5.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=298982=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-298982
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 21/Aug/19 20:22
Start Date: 21/Aug/19 20:22
Worklog Time Spent: 10m 
  Work Description: mxm commented on pull request #9371: [BEAM-7980] 
External environment with containerized worker pool
URL: https://github.com/apache/beam/pull/9371#discussion_r316384687
 
 

 ##
 File path: sdks/python/apache_beam/runners/worker/worker_pool_main.py
 ##
 @@ -0,0 +1,177 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""
+Worker pool entry point.
+
+The worker pool exposes an RPC service that is used with EXTERNAL
+environment to start and stop the SDK workers.
+
+The worker pool uses child processes for parallelism; threads are
 
 Review comment:
   It is a bit odd to read this here because `BeamFnExternalWorkerPoolServicer` 
supports threads internally and tests make use of this feature. If this module 
is the entry point for the worker pool, maybe the pooling code itself should be 
located in a different module and we should only handle its configuration here.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 298982)
Time Spent: 4h 20m  (was: 4h 10m)

> External environment with containerized worker pool
> ---
>
> Key: BEAM-7980
> URL: https://issues.apache.org/jira/browse/BEAM-7980
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Time Spent: 4h 20m
>  Remaining Estimate: 0h
>
> Augment Beam Python docker image and boot.go so that it can be used to launch 
> BeamFnExternalWorkerPoolServicer.
> [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0]
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-8015) Get logs for Docker containers that fail to start up

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8015?focusedWorklogId=298977=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-298977
 ]

ASF GitHub Bot logged work on BEAM-8015:


Author: ASF GitHub Bot
Created on: 21/Aug/19 20:17
Start Date: 21/Aug/19 20:17
Worklog Time Spent: 10m 
  Work Description: ibzib commented on issue #9389: [BEAM-8015] Get logs 
from Docker containers
URL: https://github.com/apache/beam/pull/9389#issuecomment-523631596
 
 
   Run Java PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 298977)
Time Spent: 40m  (was: 0.5h)

> Get logs for Docker containers that fail to start up
> 
>
> Key: BEAM-8015
> URL: https://issues.apache.org/jira/browse/BEAM-8015
> Project: Beam
>  Issue Type: Improvement
>  Components: java-fn-execution
>Reporter: Kyle Weaver
>Assignee: Kyle Weaver
>Priority: Major
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> Currently, when SDK worker containers fail to start up properly, an exception 
> is thrown that provides no information about what happened. We can improve 
> debugging by keeping containers around long enough to log their logs before 
> removing them.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=298973=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-298973
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 21/Aug/19 20:12
Start Date: 21/Aug/19 20:12
Worklog Time Spent: 10m 
  Work Description: aaltay commented on pull request #9371: [BEAM-7980] 
External environment with containerized worker pool
URL: https://github.com/apache/beam/pull/9371#discussion_r316380746
 
 

 ##
 File path: sdks/python/apache_beam/runners/worker/worker_pool_main.py
 ##
 @@ -0,0 +1,170 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""
+Worker pool entry point.
+
+The worker pool exposes an RPC service that is used with EXTERNAL
+environment to start and stop the SDK workers.
+
+This entry point is used by the Python SDK container in worker pool mode.
+"""
+
+from __future__ import absolute_import
+
+import argparse
+import atexit
+import logging
+import subprocess
+import sys
+import threading
+import time
+from concurrent import futures
+
+import grpc
+
+from apache_beam.portability.api import beam_fn_api_pb2
+from apache_beam.portability.api import beam_fn_api_pb2_grpc
+from apache_beam.runners.worker import sdk_worker
+
+
+class BeamFnExternalWorkerPoolServicer(
+beam_fn_api_pb2_grpc.BeamFnExternalWorkerPoolServicer):
+
+  def __init__(self, worker_threads, use_process=False,
+   container_executable=None):
+self._worker_threads = worker_threads
+self._use_process = use_process
+self._container_executable = container_executable
+self._worker_processes = {}
+
+  @classmethod
+  def start(cls, worker_threads=1, use_process=False, port=0,
+container_executable=None):
+worker_server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
+worker_address = 'localhost:%s' % worker_server.add_insecure_port(
+'[::]:%s' % port)
+beam_fn_api_pb2_grpc.add_BeamFnExternalWorkerPoolServicer_to_server(
+cls(worker_threads,
+use_process=use_process,
+container_executable=container_executable),
+worker_server)
+worker_server.start()
+
+# Register to kill the subprocesses on exit.
+def kill_worker_processes():
+  for worker_process in cls._worker_processes.values():
+worker_process.kill()
+atexit.register(kill_worker_processes)
+
+return worker_address, worker_server
+
+  def StartWorker(self, start_worker_request, unused_context):
+try:
+  if self._use_process:
+command = ['python', '-c',
+   'from apache_beam.runners.worker.sdk_worker '
+   'import SdkHarness; '
+   'SdkHarness("%s",worker_count=%d,worker_id="%s").run()' % (
+   start_worker_request.control_endpoint.url,
+   self._worker_threads,
+   start_worker_request.worker_id)]
+if self._container_executable:
+  # command as per container spec
+  command = [self._container_executable,
+ '--id=%s' % start_worker_request.worker_id,
+ '--logging_endpoint=%s'
+ % start_worker_request.logging_endpoint.url,
 
 Review comment:
   Makes sense. Thank you for dcoumenting.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 298973)
Time Spent: 4h 10m  (was: 4h)

> External environment with containerized worker pool
> ---
>
> Key: BEAM-7980
> URL: https://issues.apache.org/jira/browse/BEAM-7980
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Reporter: Thomas Weise
>Assignee: Thomas Weise

[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=298974=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-298974
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 21/Aug/19 20:12
Start Date: 21/Aug/19 20:12
Worklog Time Spent: 10m 
  Work Description: aaltay commented on pull request #9371: [BEAM-7980] 
External environment with containerized worker pool
URL: https://github.com/apache/beam/pull/9371#discussion_r316380301
 
 

 ##
 File path: sdks/python/container/boot.go
 ##
 @@ -59,6 +60,18 @@ const (
 
 func main() {
flag.Parse()
+
+   if *workerPool == true {
+   args := []string{
+   "-m",
+   "apache_beam.runners.worker.worker_pool_main",
+   "--service_port=5",
+   "--container_executable=/opt/apache/beam/boot",
+   }
+   log.Printf("Starting Python SDK worker pool: python %v", 
strings.Join(args, " "))
+   log.Fatalf("Python SDK worker pool exited: %v", 
execx.Execute("python", args...))
 
 Review comment:
   Sounds good. Do you mind adding a TODO here for that follow up PR.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 298974)

> External environment with containerized worker pool
> ---
>
> Key: BEAM-7980
> URL: https://issues.apache.org/jira/browse/BEAM-7980
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Time Spent: 4h 10m
>  Remaining Estimate: 0h
>
> Augment Beam Python docker image and boot.go so that it can be used to launch 
> BeamFnExternalWorkerPoolServicer.
> [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0]
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7886) Make row coder a standard coder and implement in python

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7886?focusedWorklogId=298970=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-298970
 ]

ASF GitHub Bot logged work on BEAM-7886:


Author: ASF GitHub Bot
Created on: 21/Aug/19 20:08
Start Date: 21/Aug/19 20:08
Worklog Time Spent: 10m 
  Work Description: aaltay commented on pull request #9188: [BEAM-7886] 
Make row coder a standard coder and implement in Python
URL: https://github.com/apache/beam/pull/9188#discussion_r316378981
 
 

 ##
 File path: sdks/python/setup.py
 ##
 @@ -115,8 +115,7 @@ def get_version():
 'mock>=1.0.1,<3.0.0',
 'pymongo>=3.8.0,<4.0.0',
 'oauth2client>=2.0.1,<4',
-# grpcio 1.8.1 and above requires protobuf 3.5.0.post1.
-'protobuf>=3.5.0.post1,<4',
+'protobuf>=3.8.0.post1,<4',
 
 Review comment:
   > @aaltay do you think this might be an issue?
   Upping the lowerbound for protobuf should be ok. @TheNeuralBit what is the 
reason for this change? Does something require this newer version? 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 298970)
Time Spent: 5h 10m  (was: 5h)

> Make row coder a standard coder and implement in python
> ---
>
> Key: BEAM-7886
> URL: https://issues.apache.org/jira/browse/BEAM-7886
> Project: Beam
>  Issue Type: Improvement
>  Components: beam-model, sdk-java-core, sdk-py-core
>Reporter: Brian Hulette
>Assignee: Brian Hulette
>Priority: Major
>  Time Spent: 5h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7886) Make row coder a standard coder and implement in python

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7886?focusedWorklogId=298971=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-298971
 ]

ASF GitHub Bot logged work on BEAM-7886:


Author: ASF GitHub Bot
Created on: 21/Aug/19 20:08
Start Date: 21/Aug/19 20:08
Worklog Time Spent: 10m 
  Work Description: aaltay commented on pull request #9188: [BEAM-7886] 
Make row coder a standard coder and implement in Python
URL: https://github.com/apache/beam/pull/9188#discussion_r316378981
 
 

 ##
 File path: sdks/python/setup.py
 ##
 @@ -115,8 +115,7 @@ def get_version():
 'mock>=1.0.1,<3.0.0',
 'pymongo>=3.8.0,<4.0.0',
 'oauth2client>=2.0.1,<4',
-# grpcio 1.8.1 and above requires protobuf 3.5.0.post1.
-'protobuf>=3.5.0.post1,<4',
+'protobuf>=3.8.0.post1,<4',
 
 Review comment:
   > @aaltay do you think this might be an issue?
   
   Upping the lowerbound for protobuf should be ok. @TheNeuralBit what is the 
reason for this change? Does something require this newer version? 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 298971)
Time Spent: 5h 20m  (was: 5h 10m)

> Make row coder a standard coder and implement in python
> ---
>
> Key: BEAM-7886
> URL: https://issues.apache.org/jira/browse/BEAM-7886
> Project: Beam
>  Issue Type: Improvement
>  Components: beam-model, sdk-java-core, sdk-py-core
>Reporter: Brian Hulette
>Assignee: Brian Hulette
>Priority: Major
>  Time Spent: 5h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-8008) Show error message from expansion service in Java External transform

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8008?focusedWorklogId=298965=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-298965
 ]

ASF GitHub Bot logged work on BEAM-8008:


Author: ASF GitHub Bot
Created on: 21/Aug/19 20:04
Start Date: 21/Aug/19 20:04
Worklog Time Spent: 10m 
  Work Description: chamikaramj commented on issue #9377: [BEAM-8008] show 
error message from expansion service in Java External transform
URL: https://github.com/apache/beam/pull/9377#issuecomment-523627020
 
 
   LGTM. Thanks.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 298965)
Time Spent: 0.5h  (was: 20m)

> Show error message from expansion service in Java External transform
> 
>
> Key: BEAM-8008
> URL: https://issues.apache.org/jira/browse/BEAM-8008
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-core
>Reporter: Heejong Lee
>Assignee: Heejong Lee
>Priority: Major
>  Labels: portability
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> show error message from expansion service in Java External transform



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-8008) Show error message from expansion service in Java External transform

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8008?focusedWorklogId=298966=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-298966
 ]

ASF GitHub Bot logged work on BEAM-8008:


Author: ASF GitHub Bot
Created on: 21/Aug/19 20:04
Start Date: 21/Aug/19 20:04
Worklog Time Spent: 10m 
  Work Description: chamikaramj commented on issue #9377: [BEAM-8008] show 
error message from expansion service in Java External transform
URL: https://github.com/apache/beam/pull/9377#issuecomment-523627057
 
 
   Retest this please
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 298966)
Time Spent: 40m  (was: 0.5h)

> Show error message from expansion service in Java External transform
> 
>
> Key: BEAM-8008
> URL: https://issues.apache.org/jira/browse/BEAM-8008
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-core
>Reporter: Heejong Lee
>Assignee: Heejong Lee
>Priority: Major
>  Labels: portability
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> show error message from expansion service in Java External transform



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (BEAM-6114) SQL join selection should be done in planner, not in expansion to PTransform

2019-08-21 Thread Rui Wang (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-6114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16912643#comment-16912643
 ] 

Rui Wang commented on BEAM-6114:


I am not sure if I understand the thread. 

A very basic advantage to separate RelNodes are cost based optimization: each 
Rel implementation can compute their cost independently. Merged different 
implementation of Join into one Rel apparently make CBO complicated. 

> SQL join selection should be done in planner, not in expansion to PTransform
> 
>
> Key: BEAM-6114
> URL: https://issues.apache.org/jira/browse/BEAM-6114
> Project: Beam
>  Issue Type: Improvement
>  Components: dsl-sql
>Reporter: Kenneth Knowles
>Assignee: Rahul Patwari
>Priority: Major
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> Currently Beam SQL joins all go through a single physical operator which has 
> a single PTransform that does all join algorithms based on properties of its 
> input PCollections as well as the relational algebra.
> A first step is to make the needed information part of the relational 
> algebra, so it can choose a PTransform based on that, and the PTransforms can 
> be simpler.
> Second step is to have separate (physical) relational operators for different 
> join algorithms.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-6114) SQL join selection should be done in planner, not in expansion to PTransform

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-6114?focusedWorklogId=298958=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-298958
 ]

ASF GitHub Bot logged work on BEAM-6114:


Author: ASF GitHub Bot
Created on: 21/Aug/19 19:56
Start Date: 21/Aug/19 19:56
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on pull request #9395: [BEAM-6114] 
Calcite Rules to Select Type of Join in BeamSQL
URL: https://github.com/apache/beam/pull/9395#discussion_r316372486
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamStandardJoinRule.java
 ##
 @@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.rule;
+
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRel;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamStandardJoinRel;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rel.logical.LogicalJoin;
+
+public class BeamStandardJoinRule extends RelOptRule {
 
 Review comment:
   Explain matching condition in javadoc.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 298958)

> SQL join selection should be done in planner, not in expansion to PTransform
> 
>
> Key: BEAM-6114
> URL: https://issues.apache.org/jira/browse/BEAM-6114
> Project: Beam
>  Issue Type: Improvement
>  Components: dsl-sql
>Reporter: Kenneth Knowles
>Assignee: Rahul Patwari
>Priority: Major
>  Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> Currently Beam SQL joins all go through a single physical operator which has 
> a single PTransform that does all join algorithms based on properties of its 
> input PCollections as well as the relational algebra.
> A first step is to make the needed information part of the relational 
> algebra, so it can choose a PTransform based on that, and the PTransforms can 
> be simpler.
> Second step is to have separate (physical) relational operators for different 
> join algorithms.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-6114) SQL join selection should be done in planner, not in expansion to PTransform

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-6114?focusedWorklogId=298959=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-298959
 ]

ASF GitHub Bot logged work on BEAM-6114:


Author: ASF GitHub Bot
Created on: 21/Aug/19 19:56
Start Date: 21/Aug/19 19:56
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on pull request #9395: [BEAM-6114] 
Calcite Rules to Select Type of Join in BeamSQL
URL: https://github.com/apache/beam/pull/9395#discussion_r316372312
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamStandardJoinRule.java
 ##
 @@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.rule;
+
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRel;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamStandardJoinRel;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rel.logical.LogicalJoin;
+
+public class BeamStandardJoinRule extends RelOptRule {
 
 Review comment:
   Rename to BeamCoGBKJoinRel?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 298959)
Time Spent: 2h  (was: 1h 50m)

> SQL join selection should be done in planner, not in expansion to PTransform
> 
>
> Key: BEAM-6114
> URL: https://issues.apache.org/jira/browse/BEAM-6114
> Project: Beam
>  Issue Type: Improvement
>  Components: dsl-sql
>Reporter: Kenneth Knowles
>Assignee: Rahul Patwari
>Priority: Major
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> Currently Beam SQL joins all go through a single physical operator which has 
> a single PTransform that does all join algorithms based on properties of its 
> input PCollections as well as the relational algebra.
> A first step is to make the needed information part of the relational 
> algebra, so it can choose a PTransform based on that, and the PTransforms can 
> be simpler.
> Second step is to have separate (physical) relational operators for different 
> join algorithms.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7520) DirectRunner timers are not strictly time ordered

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7520?focusedWorklogId=298955=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-298955
 ]

ASF GitHub Bot logged work on BEAM-7520:


Author: ASF GitHub Bot
Created on: 21/Aug/19 19:56
Start Date: 21/Aug/19 19:56
Worklog Time Spent: 10m 
  Work Description: je-ik commented on pull request #9190: [BEAM-7520] Fix 
timer firing order in DirectRunner
URL: https://github.com/apache/beam/pull/9190#discussion_r316373884
 
 

 ##
 File path: 
runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
 ##
 @@ -18,10 +18,10 @@
 package org.apache.beam.runners.direct;
 
 import com.google.auto.value.AutoValue;
+import java.util.Optional;
 
 Review comment:
   Yes, I also think that the performance impact of creating one short lived 
instance should be negligible. I actually tried to measure it, but I didn't get 
any statistically significant difference in this case. Although, I have seen 
significant impact in other cases (getting rid of creating useless instance in 
one place of spark runner gave me about 20% performance gain, but that could 
have been very specific case). But that is probably not that significant to 
this PR. :-)
   
   To conclude this, I will drop the commit where I changed the Optional in 
DirectRunner and start a discuss thread related to making the usage consistent 
across the code base. Currently there is approx 3:2 usage of java.util vs. 
guava.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 298955)
Time Spent: 8.5h  (was: 8h 20m)

> DirectRunner timers are not strictly time ordered
> -
>
> Key: BEAM-7520
> URL: https://issues.apache.org/jira/browse/BEAM-7520
> Project: Beam
>  Issue Type: Bug
>  Components: runner-direct
>Affects Versions: 2.13.0
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 8.5h
>  Remaining Estimate: 0h
>
> Let's suppose we have the following situation:
>  - statful ParDo with two timers - timerA and timerB
>  - timerA is set for window.maxTimestamp() + 1
>  - timerB is set anywhere between  timerB.timestamp
>  - input watermark moves to BoundedWindow.TIMESTAMP_MAX_VALUE
> Then the order of timers is as follows (correct):
>  - timerB
>  - timerA
> But, if timerB sets another timer (say for timerB.timestamp + 1), then the 
> order of timers will be:
>  - timerB (timerB.timestamp)
>  - timerA (BoundedWindow.TIMESTAMP_MAX_VALUE)
>  - timerB (timerB.timestamp + 1)
> Which is not ordered by timestamp. The reason for this is that when the input 
> watermark update is evaluated, the WatermarkManager,extractFiredTimers() will 
> produce both timerA and timerB. That would be correct, but when timerB sets 
> another timer, that breaks this.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-6114) SQL join selection should be done in planner, not in expansion to PTransform

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-6114?focusedWorklogId=298957=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-298957
 ]

ASF GitHub Bot logged work on BEAM-6114:


Author: ASF GitHub Bot
Created on: 21/Aug/19 19:56
Start Date: 21/Aug/19 19:56
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on pull request #9395: [BEAM-6114] 
Calcite Rules to Select Type of Join in BeamSQL
URL: https://github.com/apache/beam/pull/9395#discussion_r316371877
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamSideInputJoinRule.java
 ##
 @@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.rule;
+
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRel;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSideInputJoinRel;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rel.logical.LogicalJoin;
+
+public class BeamSideInputJoinRule extends RelOptRule {
 
 Review comment:
   Can you add comment to explain the matching condition? It's straightforward 
to read code but it would also be nice to have something in javadoc for read.   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 298957)
Time Spent: 1h 50m  (was: 1h 40m)

> SQL join selection should be done in planner, not in expansion to PTransform
> 
>
> Key: BEAM-6114
> URL: https://issues.apache.org/jira/browse/BEAM-6114
> Project: Beam
>  Issue Type: Improvement
>  Components: dsl-sql
>Reporter: Kenneth Knowles
>Assignee: Rahul Patwari
>Priority: Major
>  Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> Currently Beam SQL joins all go through a single physical operator which has 
> a single PTransform that does all join algorithms based on properties of its 
> input PCollections as well as the relational algebra.
> A first step is to make the needed information part of the relational 
> algebra, so it can choose a PTransform based on that, and the PTransforms can 
> be simpler.
> Second step is to have separate (physical) relational operators for different 
> join algorithms.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-6114) SQL join selection should be done in planner, not in expansion to PTransform

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-6114?focusedWorklogId=298956=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-298956
 ]

ASF GitHub Bot logged work on BEAM-6114:


Author: ASF GitHub Bot
Created on: 21/Aug/19 19:56
Start Date: 21/Aug/19 19:56
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on pull request #9395: [BEAM-6114] 
Calcite Rules to Select Type of Join in BeamSQL
URL: https://github.com/apache/beam/pull/9395#discussion_r316373715
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamSideInputLookupJoinRule.java
 ##
 @@ -19,25 +19,38 @@
 
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRel;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSideInputLookupJoinRel;
 import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.convert.ConverterRule;
 import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.logical.LogicalJoin;
 
-/** {@code ConverterRule} to replace {@code Join} with {@code BeamJoinRel}. */
-public class BeamJoinRule extends ConverterRule {
-  public static final BeamJoinRule INSTANCE = new BeamJoinRule();
+public class BeamSideInputLookupJoinRule extends ConverterRule {
 
 Review comment:
   Add javadoc for matching condition.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 298956)
Time Spent: 1h 50m  (was: 1h 40m)

> SQL join selection should be done in planner, not in expansion to PTransform
> 
>
> Key: BEAM-6114
> URL: https://issues.apache.org/jira/browse/BEAM-6114
> Project: Beam
>  Issue Type: Improvement
>  Components: dsl-sql
>Reporter: Kenneth Knowles
>Assignee: Rahul Patwari
>Priority: Major
>  Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> Currently Beam SQL joins all go through a single physical operator which has 
> a single PTransform that does all join algorithms based on properties of its 
> input PCollections as well as the relational algebra.
> A first step is to make the needed information part of the relational 
> algebra, so it can choose a PTransform based on that, and the PTransforms can 
> be simpler.
> Second step is to have separate (physical) relational operators for different 
> join algorithms.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Comment Edited] (BEAM-6114) SQL join selection should be done in planner, not in expansion to PTransform

2019-08-21 Thread Rahul Patwari (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-6114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16912631#comment-16912631
 ] 

Rahul Patwari edited comment on BEAM-6114 at 8/21/19 7:48 PM:
--

I have asked a question in Calcite regarding the best practice to Implement our 
requirement. Here is the [thread 
link|[https://lists.apache.org/thread.html/023fe5e87d87a3405d8580e4adc93ca935f3aa4797119317ed016e9c@%3Cdev.calcite.apache.org%3E]].

It seems that they favor our current approach of identifying Join type in the 
expansion of PTransform.


was (Author: rahul8383):
I have asked a question in Calcite regarding the best practice to Implement our 
requirement. Here is the thread 
[link|[https://lists.apache.org/thread.html/023fe5e87d87a3405d8580e4adc93ca935f3aa4797119317ed016e9c@%3Cdev.calcite.apache.org%3E]].

It seems that they favor our current approach of identifying Join type in the 
expansion of PTransform.

> SQL join selection should be done in planner, not in expansion to PTransform
> 
>
> Key: BEAM-6114
> URL: https://issues.apache.org/jira/browse/BEAM-6114
> Project: Beam
>  Issue Type: Improvement
>  Components: dsl-sql
>Reporter: Kenneth Knowles
>Assignee: Rahul Patwari
>Priority: Major
>  Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> Currently Beam SQL joins all go through a single physical operator which has 
> a single PTransform that does all join algorithms based on properties of its 
> input PCollections as well as the relational algebra.
> A first step is to make the needed information part of the relational 
> algebra, so it can choose a PTransform based on that, and the PTransforms can 
> be simpler.
> Second step is to have separate (physical) relational operators for different 
> join algorithms.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


  1   2   3   >