[jira] [Work logged] (BEAM-7819) PubsubMessage message parsing is lacking non-attribute fields

2019-08-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7819?focusedWorklogId=291824=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291824
 ]

ASF GitHub Bot logged work on BEAM-7819:


Author: ASF GitHub Bot
Created on: 09/Aug/19 05:32
Start Date: 09/Aug/19 05:32
Worklog Time Spent: 10m 
  Work Description: matt-darwin commented on pull request #9232: 
[BEAM-7819] Python - parse PubSub message_id into attributes property
URL: https://github.com/apache/beam/pull/9232#discussion_r312333851
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/pubsub.py
 ##
 @@ -127,6 +130,10 @@ def _from_message(msg):
 """
 # Convert ScalarMapContainer to dict.
 attributes = dict((key, msg.attributes[key]) for key in msg.attributes)
+# Parse the PubSub message_id and add to attributes
+sysattribs = dict(message_id=msg.message_id)
 
 Review comment:
   Yes I agree.  This was merely done to fit with the existing documentation, 
however I feel that making the PubSubMessage mimic the protobuf definition 
makes more sense (and can add the publish_time attribute as well).  In 
addition, due to the handling of pubsub in dataflow, the message_id is 
currently not exposed and so is simply an empty attribute at that point. I'm 
thinking to close this pull request and update the issue with a dependency on 
the dataflow worker being amended (unfortunately I'm even worse at java than 
python!) 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 291824)
Time Spent: 3.5h  (was: 3h 20m)

> PubsubMessage message parsing is lacking non-attribute fields
> -
>
> Key: BEAM-7819
> URL: https://issues.apache.org/jira/browse/BEAM-7819
> Project: Beam
>  Issue Type: Bug
>  Components: io-py-gcp
>Reporter: Ahmet Altay
>Assignee: Udi Meiri
>Priority: Major
>  Time Spent: 3.5h
>  Remaining Estimate: 0h
>
> User reported issue: 
> https://lists.apache.org/thread.html/139b0c15abc6471a2e2202d76d915c645a529a23ecc32cd9cfecd315@%3Cuser.beam.apache.org%3E
> """
> Looking at the source code, with my untrained python eyes, I think if the 
> intention is to include the message id and the publish time in the attributes 
> attribute of the PubSubMessage type, then the protobuf mapping is missing 
> something:-
> @staticmethod
> def _from_proto_str(proto_msg):
> """Construct from serialized form of ``PubsubMessage``.
> Args:
> proto_msg: String containing a serialized protobuf of type
> https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.PubsubMessage
> Returns:
> A new PubsubMessage object.
> """
> msg = pubsub.types.pubsub_pb2.PubsubMessage()
> msg.ParseFromString(proto_msg)
> # Convert ScalarMapContainer to dict.
> attributes = dict((key, msg.attributes[key]) for key in msg.attributes)
> return PubsubMessage(msg.data, attributes)
> The protobuf definition is here:-
> https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.PubsubMessage
> and so it looks as if the message_id and publish_time are not being parsed as 
> they are seperate from the attributes. Perhaps the PubsubMessage class needs 
> expanding to include these as attributes, or they would need adding to the 
> dictionary for attributes. This would only need doing for the _from_proto_str 
> as obviously they would not need to be populated when transmitting a message 
> to PubSub.
> My python is not great, I'm assuming the latter option would need to look 
> something like this?
> attributes = dict((key, msg.attributes[key]) for key in msg.attributes)
> attributes.update({'message_id': msg.message_id, 'publish_time': 
> msg.publish_time})
> return PubsubMessage(msg.data, attributes)
> """



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-7896) Rate estimation for Kafka Table

2019-08-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7896?focusedWorklogId=291794=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291794
 ]

ASF GitHub Bot logged work on BEAM-7896:


Author: ASF GitHub Bot
Created on: 09/Aug/19 04:23
Start Date: 09/Aug/19 04:23
Worklog Time Spent: 10m 
  Work Description: akedin commented on issue #9298: [BEAM-7896] 
Implementing RateEstimation for KafkaTable 
URL: https://github.com/apache/beam/pull/9298#issuecomment-519771877
 
 
   Run Java PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 291794)
Time Spent: 2h 40m  (was: 2.5h)

> Rate estimation for Kafka Table
> ---
>
> Key: BEAM-7896
> URL: https://issues.apache.org/jira/browse/BEAM-7896
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Alireza Samadianzakaria
>Assignee: Alireza Samadianzakaria
>Priority: Major
>  Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> Currently, KafkaTable returns UNKNOWN statistics for its rate. 
> We can use previously arrived tuples to estimate the rate and return correct 
> statistics (See 
> [https://docs.google.com/document/d/1vi1PBBu5IqSy-qZl1Gk-49CcANOpbNs1UAud6LnOaiY|https://docs.google.com/document/d/1vi1PBBu5IqSy-qZl1Gk-49CcANOpbNs1UAud6LnOaiY/])
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-3489) Expose the message id of received messages within PubsubMessage

2019-08-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3489?focusedWorklogId=291685=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291685
 ]

ASF GitHub Bot logged work on BEAM-3489:


Author: ASF GitHub Bot
Created on: 09/Aug/19 00:53
Start Date: 09/Aug/19 00:53
Worklog Time Spent: 10m 
  Work Description: udim commented on pull request #8370: [BEAM-3489] add 
PubSub messageId in PubsubMessage
URL: https://github.com/apache/beam/pull/8370#discussion_r312296771
 
 

 ##
 File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
 ##
 @@ -884,7 +884,7 @@ public PubsubMessage getCurrent() throws 
NoSuchElementException {
   if (current == null) {
 throw new NoSuchElementException();
   }
-  return new PubsubMessage(current.elementBytes, current.attributes);
+  return new PubsubMessage(current.elementBytes, current.attributes, 
current.recordId);
 
 Review comment:
   nm, my mistake
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 291685)
Time Spent: 6h 50m  (was: 6h 40m)

> Expose the message id of received messages within PubsubMessage
> ---
>
> Key: BEAM-3489
> URL: https://issues.apache.org/jira/browse/BEAM-3489
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-gcp
>Reporter: Luke Cwik
>Assignee: Thinh Ha
>Priority: Minor
>  Labels: newbie, starter
>  Time Spent: 6h 50m
>  Remaining Estimate: 0h
>
> This task is about passing forward the message id from the pubsub proto to 
> the java PubsubMessage.
> Add a message id field to PubsubMessage.
> Update the coder for PubsubMessage to encode the message id.
> Update the translation from the Pubsub proto message to the Dataflow message:
> https://github.com/apache/beam/blob/2e275264b21db45787833502e5e42907b05e28b8/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L976



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-7896) Rate estimation for Kafka Table

2019-08-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7896?focusedWorklogId=291680=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291680
 ]

ASF GitHub Bot logged work on BEAM-7896:


Author: ASF GitHub Bot
Created on: 09/Aug/19 00:45
Start Date: 09/Aug/19 00:45
Worklog Time Spent: 10m 
  Work Description: asf-ci commented on issue #9298: [BEAM-7896] 
Implementing RateEstimation for KafkaTable 
URL: https://github.com/apache/beam/pull/9298#issuecomment-519736704
 
 
   FAILURE

   --none--
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 291680)
Time Spent: 2.5h  (was: 2h 20m)

> Rate estimation for Kafka Table
> ---
>
> Key: BEAM-7896
> URL: https://issues.apache.org/jira/browse/BEAM-7896
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Alireza Samadianzakaria
>Assignee: Alireza Samadianzakaria
>Priority: Major
>  Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> Currently, KafkaTable returns UNKNOWN statistics for its rate. 
> We can use previously arrived tuples to estimate the rate and return correct 
> statistics (See 
> [https://docs.google.com/document/d/1vi1PBBu5IqSy-qZl1Gk-49CcANOpbNs1UAud6LnOaiY|https://docs.google.com/document/d/1vi1PBBu5IqSy-qZl1Gk-49CcANOpbNs1UAud6LnOaiY/])
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-3489) Expose the message id of received messages within PubsubMessage

2019-08-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3489?focusedWorklogId=291666=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291666
 ]

ASF GitHub Bot logged work on BEAM-3489:


Author: ASF GitHub Bot
Created on: 09/Aug/19 00:16
Start Date: 09/Aug/19 00:16
Worklog Time Spent: 10m 
  Work Description: udim commented on pull request #8370: [BEAM-3489] add 
PubSub messageId in PubsubMessage
URL: https://github.com/apache/beam/pull/8370#discussion_r312291519
 
 

 ##
 File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
 ##
 @@ -884,7 +884,7 @@ public PubsubMessage getCurrent() throws 
NoSuchElementException {
   if (current == null) {
 throw new NoSuchElementException();
   }
-  return new PubsubMessage(current.elementBytes, current.attributes);
+  return new PubsubMessage(current.elementBytes, current.attributes, 
current.recordId);
 
 Review comment:
   Should that be `current.messageId`?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 291666)
Time Spent: 6h 40m  (was: 6.5h)

> Expose the message id of received messages within PubsubMessage
> ---
>
> Key: BEAM-3489
> URL: https://issues.apache.org/jira/browse/BEAM-3489
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-gcp
>Reporter: Luke Cwik
>Assignee: Thinh Ha
>Priority: Minor
>  Labels: newbie, starter
>  Time Spent: 6h 40m
>  Remaining Estimate: 0h
>
> This task is about passing forward the message id from the pubsub proto to 
> the java PubsubMessage.
> Add a message id field to PubsubMessage.
> Update the coder for PubsubMessage to encode the message id.
> Update the translation from the Pubsub proto message to the Dataflow message:
> https://github.com/apache/beam/blob/2e275264b21db45787833502e5e42907b05e28b8/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L976



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-7819) PubsubMessage message parsing is lacking non-attribute fields

2019-08-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7819?focusedWorklogId=291665=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291665
 ]

ASF GitHub Bot logged work on BEAM-7819:


Author: ASF GitHub Bot
Created on: 09/Aug/19 00:15
Start Date: 09/Aug/19 00:15
Worklog Time Spent: 10m 
  Work Description: udim commented on pull request #9232: [BEAM-7819] 
Python - parse PubSub message_id into attributes property
URL: https://github.com/apache/beam/pull/9232#discussion_r312291239
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/pubsub.py
 ##
 @@ -127,6 +130,10 @@ def _from_message(msg):
 """
 # Convert ScalarMapContainer to dict.
 attributes = dict((key, msg.attributes[key]) for key in msg.attributes)
+# Parse the PubSub message_id and add to attributes
+sysattribs = dict(message_id=msg.message_id)
 
 Review comment:
   This would override a user-defined 'message_id' attribute. Better to make 
message_id a member of PubsubMessage.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 291665)
Time Spent: 3h 20m  (was: 3h 10m)

> PubsubMessage message parsing is lacking non-attribute fields
> -
>
> Key: BEAM-7819
> URL: https://issues.apache.org/jira/browse/BEAM-7819
> Project: Beam
>  Issue Type: Bug
>  Components: io-py-gcp
>Reporter: Ahmet Altay
>Assignee: Udi Meiri
>Priority: Major
>  Time Spent: 3h 20m
>  Remaining Estimate: 0h
>
> User reported issue: 
> https://lists.apache.org/thread.html/139b0c15abc6471a2e2202d76d915c645a529a23ecc32cd9cfecd315@%3Cuser.beam.apache.org%3E
> """
> Looking at the source code, with my untrained python eyes, I think if the 
> intention is to include the message id and the publish time in the attributes 
> attribute of the PubSubMessage type, then the protobuf mapping is missing 
> something:-
> @staticmethod
> def _from_proto_str(proto_msg):
> """Construct from serialized form of ``PubsubMessage``.
> Args:
> proto_msg: String containing a serialized protobuf of type
> https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.PubsubMessage
> Returns:
> A new PubsubMessage object.
> """
> msg = pubsub.types.pubsub_pb2.PubsubMessage()
> msg.ParseFromString(proto_msg)
> # Convert ScalarMapContainer to dict.
> attributes = dict((key, msg.attributes[key]) for key in msg.attributes)
> return PubsubMessage(msg.data, attributes)
> The protobuf definition is here:-
> https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.PubsubMessage
> and so it looks as if the message_id and publish_time are not being parsed as 
> they are seperate from the attributes. Perhaps the PubsubMessage class needs 
> expanding to include these as attributes, or they would need adding to the 
> dictionary for attributes. This would only need doing for the _from_proto_str 
> as obviously they would not need to be populated when transmitting a message 
> to PubSub.
> My python is not great, I'm assuming the latter option would need to look 
> something like this?
> attributes = dict((key, msg.attributes[key]) for key in msg.attributes)
> attributes.update({'message_id': msg.message_id, 'publish_time': 
> msg.publish_time})
> return PubsubMessage(msg.data, attributes)
> """



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (BEAM-7049) Merge multiple input to one BeamUnionRel

2019-08-08 Thread sridhar Reddy (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-7049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16903422#comment-16903422
 ] 

sridhar Reddy commented on BEAM-7049:
-

[~amaliujia] Thanks for the update. I think I understand the issue and the 
solution.  I may contact you with further questions or an update early next 
week. 

> Merge multiple input to one BeamUnionRel
> 
>
> Key: BEAM-7049
> URL: https://issues.apache.org/jira/browse/BEAM-7049
> Project: Beam
>  Issue Type: Improvement
>  Components: dsl-sql
>Reporter: Rui Wang
>Assignee: sridhar Reddy
>Priority: Major
>
> BeamUnionRel assumes inputs are two and rejects more. So `a UNION b UNION c` 
> will have to be created as UNION(a, UNION(b, c)) and have two shuffles. If 
> BeamUnionRel can handle multiple shuffles, we will have only one shuffle



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-7916) Add ElasticsearchIO query parameter to take a ValueProvider

2019-08-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7916?focusedWorklogId=291658=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291658
 ]

ASF GitHub Bot logged work on BEAM-7916:


Author: ASF GitHub Bot
Created on: 08/Aug/19 23:52
Start Date: 08/Aug/19 23:52
Worklog Time Spent: 10m 
  Work Description: oliverhenlich commented on issue #9285: [BEAM-7916] Add 
ElasticsearchIO query parameter to take a ValueProvider
URL: https://github.com/apache/beam/pull/9285#issuecomment-519727490
 
 
   Hi @aromanenko-dev, 
   Thanks for merging this! I just realized that it's been merged into `2.14.0` 
which has already been released. Will that make it into the next version? What 
is the next version? On the Jira it says `2.16.0`.
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 291658)
Time Spent: 1.5h  (was: 1h 20m)

> Add ElasticsearchIO query parameter to take a ValueProvider
> ---
>
> Key: BEAM-7916
> URL: https://issues.apache.org/jira/browse/BEAM-7916
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-elasticsearch
>Affects Versions: 2.14.0
>Reporter: Oliver Henlich
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.16.0
>
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> We need to be able to perform Elasticsearch queries that are dynamic. The 
> problem is {{ElasticsearchIO.read().withQuery()}} only accepts a string which 
> means the query must be known when the pipleline/Google Dataflow Template is 
> built.
> It would be great if we could change the parameter on the {{withQuery()}} 
> method from {{String}} to {{ValueProvider}}.
> Pull request: https://github.com/apache/beam/pull/9285



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (BEAM-7713) Migrate to "typing" module typing types in Beam typehints (on Py2 and Py3).

2019-08-08 Thread Udi Meiri (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16903397#comment-16903397
 ] 

Udi Meiri commented on BEAM-7713:
-

Most important is to allow Beam users to only use typing hints.

> Migrate to "typing" module typing types in Beam typehints (on Py2 and Py3).
> ---
>
> Key: BEAM-7713
> URL: https://issues.apache.org/jira/browse/BEAM-7713
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Valentyn Tymofieiev
>Assignee: Udi Meiri
>Priority: Major
> Fix For: 2.16.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-7696) If classpath directory contains a directory it causes exception in Spark runner staging

2019-08-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7696?focusedWorklogId=291623=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291623
 ]

ASF GitHub Bot logged work on BEAM-7696:


Author: ASF GitHub Bot
Created on: 08/Aug/19 22:45
Start Date: 08/Aug/19 22:45
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on issue #9019: [BEAM-7696] Prepare 
files to stage also in local master of spark runner.
URL: https://github.com/apache/beam/pull/9019#issuecomment-519714131
 
 
   I suggest that we re-use the code in Dataflow and to automatically convert 
directory classpath entries into jars:
   
https://github.com/apache/beam/blob/3e97543d53cf30b7ac072e225d358e8436784220/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java#L256
   
   It will likely require some refactoring to make it available to more runners.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 291623)
Time Spent: 4h 10m  (was: 4h)

> If classpath directory contains a directory it causes exception in Spark 
> runner staging
> ---
>
> Key: BEAM-7696
> URL: https://issues.apache.org/jira/browse/BEAM-7696
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Wang Yanlin
>Assignee: Wang Yanlin
>Priority: Minor
> Fix For: 2.15.0
>
> Attachments: addJar_exception.jpg, files_contains_dir.jpg
>
>  Time Spent: 4h 10m
>  Remaining Estimate: 0h
>
> Run the unit test  SparkPipelineStateTest.testBatchPipelineRunningState in 
> IntelliJ IDEA on my mac, get the IllegalArgumentException in the console 
> output. I check the source code, and find the result of 
> _PipelineResources.detectClassPathResourcesToStage_ contains directory, which 
> is the cause of the exception.
> See the attached file 'addJar_exception.jpg' for detail, and the result of 
> _PipelineResources.detectClassPathResourcesToStage_
> is showed in attached file 'files_contains_dir.jpg' during debug.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Resolved] (BEAM-5822) Vendor bytebuddy

2019-08-08 Thread Luke Cwik (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5822?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Cwik resolved BEAM-5822.
-
   Resolution: Fixed
Fix Version/s: 2.16.0

> Vendor bytebuddy
> 
>
> Key: BEAM-5822
> URL: https://issues.apache.org/jira/browse/BEAM-5822
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core
>Reporter: Kenneth Knowles
>Assignee: Kai Jiang
>Priority: Major
> Fix For: 2.16.0
>
>  Time Spent: 2h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-5822) Vendor bytebuddy

2019-08-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5822?focusedWorklogId=291598=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291598
 ]

ASF GitHub Bot logged work on BEAM-5822:


Author: ASF GitHub Bot
Created on: 08/Aug/19 22:05
Start Date: 08/Aug/19 22:05
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on pull request #9272: [BEAM-5822] 
Use vendored bytebuddy in sdks-java-core
URL: https://github.com/apache/beam/pull/9272
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 291598)
Time Spent: 2h 20m  (was: 2h 10m)

> Vendor bytebuddy
> 
>
> Key: BEAM-5822
> URL: https://issues.apache.org/jira/browse/BEAM-5822
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core
>Reporter: Kenneth Knowles
>Assignee: Kai Jiang
>Priority: Major
> Fix For: 2.16.0
>
>  Time Spent: 2h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (BEAM-2535) Allow explicit output time independent of firing specification for all timers

2019-08-08 Thread Luke Cwik (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-2535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16903366#comment-16903366
 ] 

Luke Cwik commented on BEAM-2535:
-

I did a little bit of this work by defining the coder for timers and also so 
they can store payloads. See https://github.com/apache/beam/pull/5794 and 
BEAM-4659.

But feel free to make this work for non-portable pipelines as well.

> Allow explicit output time independent of firing specification for all timers
> -
>
> Key: BEAM-2535
> URL: https://issues.apache.org/jira/browse/BEAM-2535
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Kenneth Knowles
>Assignee: Shehzaad Nakhoda
>Priority: Major
>  Time Spent: 3h 50m
>  Remaining Estimate: 0h
>
> Today, we have insufficient control over the event time timestamp of elements 
> output from a timer callback.
> 1. For an event time timer, it is the timestamp of the timer itself.
>  2. For a processing time timer, it is the current input watermark at the 
> time of processing.
> But for both of these, we may want to reserve the right to output a 
> particular time, aka set a "watermark hold".
> A naive implementation of a {{TimerWithWatermarkHold}} would work for making 
> sure output is not droppable, but does not fully explain window expiration 
> and late data/timer dropping.
> In the natural interpretation of a timer as a feedback loop on a transform, 
> timers should be viewed as another channel of input, with a watermark, and 
> items on that channel _all need event time timestamps even if they are 
> delivered according to a different time domain_.
> I propose that the specification for when a timer should fire should be 
> separated (with nice defaults) from the specification of the event time of 
> resulting outputs. These timestamps will determine a side channel with a new 
> "timer watermark" that constrains the output watermark.
>  - We still need to fire event time timers according to the input watermark, 
> so that event time timers fire.
>  - Late data dropping and window expiration will be in terms of the minimum 
> of the input watermark and the timer watermark. In this way, whenever a timer 
> is set, the window is not going to be garbage collected.
>  - We will need to make sure we have a way to "wake up" a window once it is 
> expired; this may be as simple as exhausting the timer channel as soon as the 
> input watermark indicates expiration of a window
> This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It 
> seems reasonable to use timers as an implementation detail (e.g. in 
> runners-core utilities) without wanting any of this additional machinery. For 
> example, if there is no possibility of output from the timer callback.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (BEAM-7935) TypeError: can't pickle _cffi_backend.CDataGCP objects

2019-08-08 Thread Marappa Ganesan (JIRA)
Marappa Ganesan created BEAM-7935:
-

 Summary: TypeError: can't pickle _cffi_backend.CDataGCP objects
 Key: BEAM-7935
 URL: https://issues.apache.org/jira/browse/BEAM-7935
 Project: Beam
  Issue Type: Bug
  Components: sdk-py-core
Affects Versions: 2.13.0
 Environment: Linux(Debian GNU/Linux 9) ( SMP x86_64 GNU/Linux)
 Python 2.7.13

Content of requirements.txt file
--
apache-beam==2.13.0
apache-beam[gcp]
google-cloud-core==1.0.3
google-cloud-iam
google-cloud-bigquery==1.6.1
PyYAML==3.13
six==1.12.0
pyOpenSSL
httplib2==0.12.0
google-apitools
google-api-python-client
google-cloud-storage==1.17.0


Reporter: Marappa Ganesan


Python Dataflow runner failed with folrowing error

*---*

*p.run().wait_until_finish()*
 *File 
"/home/marappan/sanmgcppy2env/local/lib/python2.7/site-packages/apache_beam/pipeline.py",
 line 416, in run*
 *pickler.dump_session(os.path.join(tmpdir, 'main_session.pickle'))*
 *File 
"/home/marappan/sanmgcppy2env/local/lib/python2.7/site-packages/apache_beam/internal/pickler.py",
 line 274, in dump_session*
 *dill.dump_session(file_path)*
 *File 
"/home/marappan/sanmgcppy2env/local/lib/python2.7/site-packages/dill/_dill.py", 
line 393, in dump_session*
 *pickler.dump(main)*
 *File "/usr/lib/python2.7/pickle.py", line 224, in dump*
 *self.save(obj)*
 *File "/usr/lib/python2.7/pickle.py", line 286, in save*
 *f(self, obj) # Call unbound method with explicit self*
 *File 
"/home/marappan/sanmgcppy2env/local/lib/python2.7/site-packages/apache_beam/internal/pickler.py",
 line 149, in save_module*
 *return old_save_module(pickler, obj)*
 *File 
"/home/marappan/sanmgcppy2env/local/lib/python2.7/site-packages/dill/_dill.py", 
line 1269, in save_module*
 *state=_main_dict)*
 *File "/usr/lib/python2.7/pickle.py", line 425, in save_reduce*
 *save(state)*
 *File "/usr/lib/python2.7/pickle.py", line 286, in save*
 *f(self, obj) # Call unbound method with explicit self*
 *File 
"/home/marappan/sanmgcppy2env/local/lib/python2.7/site-packages/apache_beam/internal/pickler.py",
 line 198, in new_save_module_dict*
 *return old_save_module_dict(pickler, obj)*
 *File 
"/home/marappan/sanmgcppy2env/local/lib/python2.7/site-packages/dill/_dill.py", 
line 902, in save_module_dict*
 *StockPickler.save_dict(pickler, obj)*
 *File "/usr/lib/python2.7/pickle.py", line 655, in save_dict*
 *self._batch_setitems(obj.iteritems())*
 *File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems*
 *save(v)*
 *File "/usr/lib/python2.7/pickle.py", line 331, in save*
 *self.save_reduce(obj=obj, *rv)*
 *File "/usr/lib/python2.7/pickle.py", line 425, in save_reduce*
 *save(state)*
 *File "/usr/lib/python2.7/pickle.py", line 286, in save*
 *f(self, obj) # Call unbound method with explicit self*
 *File 
"/home/marappan/sanmgcppy2env/local/lib/python2.7/site-packages/apache_beam/internal/pickler.py",
 line 198, in new_save_module_dict*
 *return old_save_module_dict(pickler, obj)*
 *File 
"/home/marappan/sanmgcppy2env/local/lib/python2.7/site-packages/dill/_dill.py", 
line 902, in save_module_dict*
 *StockPickler.save_dict(pickler, obj)*
 *File "/usr/lib/python2.7/pickle.py", line 655, in save_dict*
 *self._batch_setitems(obj.iteritems())*
 *File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems*
 *save(v)*
 *File "/usr/lib/python2.7/pickle.py", line 331, in save*
 *self.save_reduce(obj=obj, *rv)*
 *File "/usr/lib/python2.7/pickle.py", line 425, in save_reduce*
 *save(state)*
 *File "/usr/lib/python2.7/pickle.py", line 286, in save*
 *f(self, obj) # Call unbound method with explicit self*
 *File 
"/home/marappan/sanmgcppy2env/local/lib/python2.7/site-packages/apache_beam/internal/pickler.py",
 line 198, in new_save_module_dict*
 *return old_save_module_dict(pickler, obj)*
 *File 
"/home/marappan/sanmgcppy2env/local/lib/python2.7/site-packages/dill/_dill.py", 
line 902, in save_module_dict*
 *StockPickler.save_dict(pickler, obj)*
 *File "/usr/lib/python2.7/pickle.py", line 655, in save_dict*
 *self._batch_setitems(obj.iteritems())*
 *File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems*
 *save(v)*
 *File "/usr/lib/python2.7/pickle.py", line 331, in save*
 *self.save_reduce(obj=obj, *rv)*
 *File "/usr/lib/python2.7/pickle.py", line 425, in save_reduce*
 *save(state)*
 *File "/usr/lib/python2.7/pickle.py", line 286, in save*
 *f(self, obj) # Call unbound method with explicit self*
 *File 
"/home/marappan/sanmgcppy2env/local/lib/python2.7/site-packages/apache_beam/internal/pickler.py",
 line 198, in new_save_module_dict*
 *return old_save_module_dict(pickler, obj)*
 *File "/usr/lib/python2.7/pickle.py", line 655, in save_dict*
 *self._batch_setitems(obj.iteritems())*
 *File "/usr/lib/python2.7/pickle.py", line 687, in 

[jira] [Work logged] (BEAM-7896) Rate estimation for Kafka Table

2019-08-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7896?focusedWorklogId=291586=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291586
 ]

ASF GitHub Bot logged work on BEAM-7896:


Author: ASF GitHub Bot
Created on: 08/Aug/19 21:29
Start Date: 08/Aug/19 21:29
Worklog Time Spent: 10m 
  Work Description: akedin commented on pull request #9298: [BEAM-7896] 
Implementing RateEstimation for KafkaTable 
URL: https://github.com/apache/beam/pull/9298#discussion_r312251383
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java
 ##
 @@ -84,7 +98,18 @@ public BeamKafkaTable updateConsumerProperties(Map configUpdates
 
   @Override
   public PCollection buildIOReader(PBegin begin) {
-KafkaIO.Read kafkaRead = null;
+return begin
+.apply("read", createKafkaRead().withoutMetadata())
+.apply("in_format", getPTransformForInput())
+.setRowSchema(getSchema());
+  }
+
+  public static void setNumberOfRecordsForRate(int numberOfRecordsForRate) {
 
 Review comment:
   The problem is we don't know the concrete user scenarios. For one, this 
table provider can  also be used in JDBC path where users might not have access 
to the class to mutate it. The specific approach to make this configurable can 
depend on the use case (e.g. what if the user want to use a different number 
for different topics or retrieve it from metadata somewhere?) Until we know 
that I'd rather just make a non-static default, and only override what we need 
for testing. This way we're not exposing a new API contract and can change the 
behavior if needed. If we expose an API, users will start using it, and we will 
have to support it. Without understanding potential use cases I'd rather not 
commit to that.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 291586)
Time Spent: 2h 20m  (was: 2h 10m)

> Rate estimation for Kafka Table
> ---
>
> Key: BEAM-7896
> URL: https://issues.apache.org/jira/browse/BEAM-7896
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Alireza Samadianzakaria
>Assignee: Alireza Samadianzakaria
>Priority: Major
>  Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> Currently, KafkaTable returns UNKNOWN statistics for its rate. 
> We can use previously arrived tuples to estimate the rate and return correct 
> statistics (See 
> [https://docs.google.com/document/d/1vi1PBBu5IqSy-qZl1Gk-49CcANOpbNs1UAud6LnOaiY|https://docs.google.com/document/d/1vi1PBBu5IqSy-qZl1Gk-49CcANOpbNs1UAud6LnOaiY/])
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-7896) Rate estimation for Kafka Table

2019-08-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7896?focusedWorklogId=291583=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291583
 ]

ASF GitHub Bot logged work on BEAM-7896:


Author: ASF GitHub Bot
Created on: 08/Aug/19 21:22
Start Date: 08/Aug/19 21:22
Worklog Time Spent: 10m 
  Work Description: riazela commented on pull request #9298: [BEAM-7896] 
Implementing RateEstimation for KafkaTable 
URL: https://github.com/apache/beam/pull/9298#discussion_r312249059
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java
 ##
 @@ -84,7 +98,18 @@ public BeamKafkaTable updateConsumerProperties(Map configUpdates
 
   @Override
   public PCollection buildIOReader(PBegin begin) {
-KafkaIO.Read kafkaRead = null;
+return begin
+.apply("read", createKafkaRead().withoutMetadata())
+.apply("in_format", getPTransformForInput())
+.setRowSchema(getSchema());
+  }
+
+  public static void setNumberOfRecordsForRate(int numberOfRecordsForRate) {
 
 Review comment:
   The reason that I included this was giving the user ability to set this 
before running the query to change the behavior of the estimator. Since user 
does not have access to the table when the optimization is happening I made it 
static. 
   I'm wondering if the current approach is fine or it is better to make this 
variable defaultNumberOfRecords and then create one non-static member to be 
NumberOfRecords. (We can still keep the setter for the static one so that the 
user can change the default)
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 291583)
Time Spent: 2h 10m  (was: 2h)

> Rate estimation for Kafka Table
> ---
>
> Key: BEAM-7896
> URL: https://issues.apache.org/jira/browse/BEAM-7896
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Alireza Samadianzakaria
>Assignee: Alireza Samadianzakaria
>Priority: Major
>  Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> Currently, KafkaTable returns UNKNOWN statistics for its rate. 
> We can use previously arrived tuples to estimate the rate and return correct 
> statistics (See 
> [https://docs.google.com/document/d/1vi1PBBu5IqSy-qZl1Gk-49CcANOpbNs1UAud6LnOaiY|https://docs.google.com/document/d/1vi1PBBu5IqSy-qZl1Gk-49CcANOpbNs1UAud6LnOaiY/])
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-7896) Rate estimation for Kafka Table

2019-08-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7896?focusedWorklogId=291578=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291578
 ]

ASF GitHub Bot logged work on BEAM-7896:


Author: ASF GitHub Bot
Created on: 08/Aug/19 21:13
Start Date: 08/Aug/19 21:13
Worklog Time Spent: 10m 
  Work Description: akedin commented on pull request #9298: [BEAM-7896] 
Implementing RateEstimation for KafkaTable 
URL: https://github.com/apache/beam/pull/9298#discussion_r312244420
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java
 ##
 @@ -138,6 +160,103 @@ public String getBootstrapServers() {
 
   @Override
   public BeamTableStatistics getTableStatistics(PipelineOptions options) {
-return BeamTableStatistics.UNBOUNDED_UNKNOWN;
+if (rowCountStatistics == null) {
+  try {
+rowCountStatistics =
+BeamTableStatistics.createUnboundedTableStatistics(
+this.computeRate(numberOfRecordsForRate));
+  } catch (Exception e) {
+LOGGER.warn("Could not get the row count for the topics " + 
getTopics(), e);
+rowCountStatistics = BeamTableStatistics.UNBOUNDED_UNKNOWN;
+  }
+}
+
+return rowCountStatistics;
+  }
+
+  /**
+   * This method returns the estimate of the computeRate for this table using 
last numberOfRecords
+   * tuples in each partition.
+   */
+  double computeRate(int numberOfRecords) throws NoEstimationException {
+Properties props = new Properties();
+
+props.put("bootstrap.servers", bootstrapServers);
+props.put("session.timeout.ms", "3");
+props.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
+props.put("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
+
+KafkaConsumer consumer = new KafkaConsumer(props);
+
+return computeRate(consumer, numberOfRecords);
+  }
+
+   double computeRate(Consumer consumer, int numberOfRecordsToCheck)
+  throws NoEstimationException {
+
+Stream c =
+getTopics().stream()
+.map(consumer::partitionsFor)
+.flatMap(Collection::stream)
+.map(parInf -> new TopicPartition(parInf.topic(), 
parInf.partition()));
+List topicPartitions = c.collect(Collectors.toList());
+
+consumer.assign(topicPartitions);
+// This will return current offset of all the partitions that are assigned 
to the consumer. (It
+// will be the last record in those partitions). Note that each topic can 
have multiple
+// partitions.
+Map offsets = consumer.endOffsets(topicPartitions);
+long nParsSeen = 0;
+for (TopicPartition par : topicPartitions) {
+  long offset = offsets.get(par);
+  nParsSeen = (offset == 0) ? nParsSeen : nParsSeen + 1;
+  consumer.seek(par, Math.max(0L, offset - numberOfRecordsToCheck));
 
 Review comment:
   Also, document the fact that messages are re-delivered in this case (if they 
are), so that we don't lose the data
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 291578)
Time Spent: 1h 50m  (was: 1h 40m)

> Rate estimation for Kafka Table
> ---
>
> Key: BEAM-7896
> URL: https://issues.apache.org/jira/browse/BEAM-7896
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Alireza Samadianzakaria
>Assignee: Alireza Samadianzakaria
>Priority: Major
>  Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> Currently, KafkaTable returns UNKNOWN statistics for its rate. 
> We can use previously arrived tuples to estimate the rate and return correct 
> statistics (See 
> [https://docs.google.com/document/d/1vi1PBBu5IqSy-qZl1Gk-49CcANOpbNs1UAud6LnOaiY|https://docs.google.com/document/d/1vi1PBBu5IqSy-qZl1Gk-49CcANOpbNs1UAud6LnOaiY/])
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-7896) Rate estimation for Kafka Table

2019-08-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7896?focusedWorklogId=291580=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291580
 ]

ASF GitHub Bot logged work on BEAM-7896:


Author: ASF GitHub Bot
Created on: 08/Aug/19 21:13
Start Date: 08/Aug/19 21:13
Worklog Time Spent: 10m 
  Work Description: akedin commented on pull request #9298: [BEAM-7896] 
Implementing RateEstimation for KafkaTable 
URL: https://github.com/apache/beam/pull/9298#discussion_r312244696
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java
 ##
 @@ -138,6 +160,103 @@ public String getBootstrapServers() {
 
   @Override
   public BeamTableStatistics getTableStatistics(PipelineOptions options) {
-return BeamTableStatistics.UNBOUNDED_UNKNOWN;
+if (rowCountStatistics == null) {
+  try {
+rowCountStatistics =
+BeamTableStatistics.createUnboundedTableStatistics(
+this.computeRate(numberOfRecordsForRate));
+  } catch (Exception e) {
+LOGGER.warn("Could not get the row count for the topics " + 
getTopics(), e);
+rowCountStatistics = BeamTableStatistics.UNBOUNDED_UNKNOWN;
+  }
+}
+
+return rowCountStatistics;
+  }
+
+  /**
+   * This method returns the estimate of the computeRate for this table using 
last numberOfRecords
+   * tuples in each partition.
+   */
+  double computeRate(int numberOfRecords) throws NoEstimationException {
+Properties props = new Properties();
+
+props.put("bootstrap.servers", bootstrapServers);
+props.put("session.timeout.ms", "3");
+props.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
 
 Review comment:
   Since `createKafkaWrite` and `createKafkaRead` use `byte[]` serializers, 
should this path also use byte arrays?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 291580)
Time Spent: 2h  (was: 1h 50m)

> Rate estimation for Kafka Table
> ---
>
> Key: BEAM-7896
> URL: https://issues.apache.org/jira/browse/BEAM-7896
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Alireza Samadianzakaria
>Assignee: Alireza Samadianzakaria
>Priority: Major
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> Currently, KafkaTable returns UNKNOWN statistics for its rate. 
> We can use previously arrived tuples to estimate the rate and return correct 
> statistics (See 
> [https://docs.google.com/document/d/1vi1PBBu5IqSy-qZl1Gk-49CcANOpbNs1UAud6LnOaiY|https://docs.google.com/document/d/1vi1PBBu5IqSy-qZl1Gk-49CcANOpbNs1UAud6LnOaiY/])
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-7896) Rate estimation for Kafka Table

2019-08-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7896?focusedWorklogId=291579=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291579
 ]

ASF GitHub Bot logged work on BEAM-7896:


Author: ASF GitHub Bot
Created on: 08/Aug/19 21:13
Start Date: 08/Aug/19 21:13
Worklog Time Spent: 10m 
  Work Description: akedin commented on pull request #9298: [BEAM-7896] 
Implementing RateEstimation for KafkaTable 
URL: https://github.com/apache/beam/pull/9298#discussion_r312245649
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java
 ##
 @@ -84,7 +98,18 @@ public BeamKafkaTable updateConsumerProperties(Map configUpdates
 
   @Override
   public PCollection buildIOReader(PBegin begin) {
-KafkaIO.Read kafkaRead = null;
+return begin
+.apply("read", createKafkaRead().withoutMetadata())
+.apply("in_format", getPTransformForInput())
+.setRowSchema(getSchema());
+  }
+
+  public static void setNumberOfRecordsForRate(int numberOfRecordsForRate) {
 
 Review comment:
   This looks like a hack, is it possible to keep the `numberOfRecordsForRate` 
non-static and protected, and then create a setter only in the test table 
implementation to override in tests?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 291579)
Time Spent: 1h 50m  (was: 1h 40m)

> Rate estimation for Kafka Table
> ---
>
> Key: BEAM-7896
> URL: https://issues.apache.org/jira/browse/BEAM-7896
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Alireza Samadianzakaria
>Assignee: Alireza Samadianzakaria
>Priority: Major
>  Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> Currently, KafkaTable returns UNKNOWN statistics for its rate. 
> We can use previously arrived tuples to estimate the rate and return correct 
> statistics (See 
> [https://docs.google.com/document/d/1vi1PBBu5IqSy-qZl1Gk-49CcANOpbNs1UAud6LnOaiY|https://docs.google.com/document/d/1vi1PBBu5IqSy-qZl1Gk-49CcANOpbNs1UAud6LnOaiY/])
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-7678) typehints with_output_types annotation doesn't work for stateful DoFn

2019-08-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7678?focusedWorklogId=291575=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291575
 ]

ASF GitHub Bot logged work on BEAM-7678:


Author: ASF GitHub Bot
Created on: 08/Aug/19 20:58
Start Date: 08/Aug/19 20:58
Worklog Time Spent: 10m 
  Work Description: aaltay commented on issue #9238: [BEAM-7678] Fixes bug 
in output element_type generation in Kv PipelineVisitor
URL: https://github.com/apache/beam/pull/9238#issuecomment-519685081
 
 
   @ecanzonieri I assigned the issue to you and closed it. I added as a 
contributor, you should be able to assign JIRA issues to yourself from now on.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 291575)
Time Spent: 2h 20m  (was: 2h 10m)

> typehints with_output_types annotation doesn't work for stateful DoFn 
> --
>
> Key: BEAM-7678
> URL: https://issues.apache.org/jira/browse/BEAM-7678
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Affects Versions: 2.13.0
>Reporter: Enrico Canzonieri
>Assignee: Enrico Canzonieri
>Priority: Minor
> Fix For: 2.16.0
>
>  Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> The output types typehints seem to be ignored when using a stateful DoFn, but 
> the same typehint works perfectly when used without state. This issue 
> prevents a custom Coder from being used because Beam will default to one of 
> the {{FastCoders}} (I believe Pickle).
> Example code:
> {code}
> @typehints.with_output_types(Message)
> class StatefulDoFn(DoFn):
> COUNTER_STATE = BagStateSpec('counter', VarIntCoder())
> def process(self, element, counter=DoFn.StateParam(COUNTER_STATE)):
>   (key, messages) = element
>   newMessage = Message()
>   return [newMessage]
> {code}
> The example code is just defining a stateful DoFn for python. The used runner 
> is the Flink 1.6.4 portable runner.
> Finally, overriding {{infer_output_type}} to return a 
> {{typehints.List[Message]}} solves the issue.
> Looking at the code, it seems to me that in 
> [https://github.com/apache/beam/blob/v2.13.0/sdks/python/apache_beam/pipeline.py#L643]
>  we do not take the typehints into account.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Resolved] (BEAM-7678) typehints with_output_types annotation doesn't work for stateful DoFn

2019-08-08 Thread Ahmet Altay (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ahmet Altay resolved BEAM-7678.
---
   Resolution: Fixed
Fix Version/s: 2.16.0

> typehints with_output_types annotation doesn't work for stateful DoFn 
> --
>
> Key: BEAM-7678
> URL: https://issues.apache.org/jira/browse/BEAM-7678
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Affects Versions: 2.13.0
>Reporter: Enrico Canzonieri
>Assignee: Enrico Canzonieri
>Priority: Minor
> Fix For: 2.16.0
>
>  Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> The output types typehints seem to be ignored when using a stateful DoFn, but 
> the same typehint works perfectly when used without state. This issue 
> prevents a custom Coder from being used because Beam will default to one of 
> the {{FastCoders}} (I believe Pickle).
> Example code:
> {code}
> @typehints.with_output_types(Message)
> class StatefulDoFn(DoFn):
> COUNTER_STATE = BagStateSpec('counter', VarIntCoder())
> def process(self, element, counter=DoFn.StateParam(COUNTER_STATE)):
>   (key, messages) = element
>   newMessage = Message()
>   return [newMessage]
> {code}
> The example code is just defining a stateful DoFn for python. The used runner 
> is the Flink 1.6.4 portable runner.
> Finally, overriding {{infer_output_type}} to return a 
> {{typehints.List[Message]}} solves the issue.
> Looking at the code, it seems to me that in 
> [https://github.com/apache/beam/blob/v2.13.0/sdks/python/apache_beam/pipeline.py#L643]
>  we do not take the typehints into account.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Assigned] (BEAM-7678) typehints with_output_types annotation doesn't work for stateful DoFn

2019-08-08 Thread Ahmet Altay (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ahmet Altay reassigned BEAM-7678:
-

Assignee: Enrico Canzonieri

> typehints with_output_types annotation doesn't work for stateful DoFn 
> --
>
> Key: BEAM-7678
> URL: https://issues.apache.org/jira/browse/BEAM-7678
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Affects Versions: 2.13.0
>Reporter: Enrico Canzonieri
>Assignee: Enrico Canzonieri
>Priority: Minor
>  Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> The output types typehints seem to be ignored when using a stateful DoFn, but 
> the same typehint works perfectly when used without state. This issue 
> prevents a custom Coder from being used because Beam will default to one of 
> the {{FastCoders}} (I believe Pickle).
> Example code:
> {code}
> @typehints.with_output_types(Message)
> class StatefulDoFn(DoFn):
> COUNTER_STATE = BagStateSpec('counter', VarIntCoder())
> def process(self, element, counter=DoFn.StateParam(COUNTER_STATE)):
>   (key, messages) = element
>   newMessage = Message()
>   return [newMessage]
> {code}
> The example code is just defining a stateful DoFn for python. The used runner 
> is the Flink 1.6.4 portable runner.
> Finally, overriding {{infer_output_type}} to return a 
> {{typehints.List[Message]}} solves the issue.
> Looking at the code, it seems to me that in 
> [https://github.com/apache/beam/blob/v2.13.0/sdks/python/apache_beam/pipeline.py#L643]
>  we do not take the typehints into account.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (BEAM-7934) Dataflow Python SDK logging: step_id is always empty string

2019-08-08 Thread James Hutchison (JIRA)
James Hutchison created BEAM-7934:
-

 Summary: Dataflow Python SDK logging: step_id is always empty 
string
 Key: BEAM-7934
 URL: https://issues.apache.org/jira/browse/BEAM-7934
 Project: Beam
  Issue Type: Bug
  Components: runner-dataflow
Affects Versions: 2.13.0
Reporter: James Hutchison


Using the dataflow runner, log messages always show up in stackdriver with the 
step_id as the empty string, so filtering log messages for a step doesn't work.
{code:java}
resource: {
  labels: {
job_id: "" 
job_name: "" 
project_id: "" 
region: "" 
step_id: "" 
  }
  type: "dataflow_step" 
}{code}
Another user seems to have posted in the old github repo and appears to be 
seeing the same problem based on their output:



[https://github.com/GoogleCloudPlatform/DataflowPythonSDK/issues/62]



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues

2019-08-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=291561=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291561
 ]

ASF GitHub Bot logged work on BEAM-7866:


Author: ASF GitHub Bot
Created on: 08/Aug/19 20:23
Start Date: 08/Aug/19 20:23
Worklog Time Spent: 10m 
  Work Description: y1chi commented on issue #9233:  [BEAM-7866] Fix python 
ReadFromMongoDB potential data loss issue
URL: https://github.com/apache/beam/pull/9233#issuecomment-519673706
 
 
   Run Python MongoDBIO_IT
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 291561)
Time Spent: 10.5h  (was: 10h 20m)

> Python MongoDB IO performance and correctness issues
> 
>
> Key: BEAM-7866
> URL: https://issues.apache.org/jira/browse/BEAM-7866
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Eugene Kirpichov
>Assignee: Yichi Zhang
>Priority: Blocker
> Fix For: 2.15.0
>
>  Time Spent: 10.5h
>  Remaining Estimate: 0h
>
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py
>  splits the query result by computing number of results in constructor, and 
> then in each reader re-executing the whole query and getting an index 
> sub-range of those results.
> This is broken in several critical ways:
> - The order of query results returned by find() is not necessarily 
> deterministic, so the idea of index ranges on it is meaningless: each shard 
> may basically get random, possibly overlapping subsets of the total results
> - Even if you add order by `_id`, the database may be changing concurrently 
> to reading and splitting. E.g. if the database contained documents with ids 
> 10 20 30 40 50, and this was split into shards 0..2 and 3..5 (under the 
> assumption that these shards would contain respectively 10 20 30, and 40 50), 
> and then suppose shard 10 20 30 is read and then document 25 is inserted - 
> then the 3..5 shard will read 30 40 50, i.e. document 30 is duplicated and 
> document 25 is lost.
> - Every shard re-executes the query and skips the first start_offset items, 
> which in total is quadratic complexity
> - The query is first executed in the constructor in order to count results, 
> which 1) means the constructor can be super slow and 2) it won't work at all 
> if the database is unavailable at the time the pipeline is constructed (e.g. 
> if this is a template).
> Unfortunately, none of these issues are caught by SourceTestUtils: this class 
> has extensive coverage with it, and the tests pass. This is because the tests 
> return the same results in the same order. I don't know how to catch this 
> automatically, and I don't know how to catch the performance issue 
> automatically, but these would all be important follow-up items after the 
> actual fix.
> CC: [~chamikara] as reviewer.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues

2019-08-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=291560=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291560
 ]

ASF GitHub Bot logged work on BEAM-7866:


Author: ASF GitHub Bot
Created on: 08/Aug/19 20:18
Start Date: 08/Aug/19 20:18
Worklog Time Spent: 10m 
  Work Description: y1chi commented on issue #9233:  [BEAM-7866] Fix python 
ReadFromMongoDB potential data loss issue
URL: https://github.com/apache/beam/pull/9233#issuecomment-519671952
 
 
   > @y1chi I have patched my forked 2.14.0 with the fixes in this ticket. What 
would be a good performance test to verify this fix?
   
   Probably can try with your own data set similar to 
https://github.com/apache/beam/blob/75150df8bb02cdafc983305631506d70d6039f86/sdks/python/apache_beam/io/mongodbio_it_test.py
 ?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 291560)
Time Spent: 10h 20m  (was: 10h 10m)

> Python MongoDB IO performance and correctness issues
> 
>
> Key: BEAM-7866
> URL: https://issues.apache.org/jira/browse/BEAM-7866
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Eugene Kirpichov
>Assignee: Yichi Zhang
>Priority: Blocker
> Fix For: 2.15.0
>
>  Time Spent: 10h 20m
>  Remaining Estimate: 0h
>
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py
>  splits the query result by computing number of results in constructor, and 
> then in each reader re-executing the whole query and getting an index 
> sub-range of those results.
> This is broken in several critical ways:
> - The order of query results returned by find() is not necessarily 
> deterministic, so the idea of index ranges on it is meaningless: each shard 
> may basically get random, possibly overlapping subsets of the total results
> - Even if you add order by `_id`, the database may be changing concurrently 
> to reading and splitting. E.g. if the database contained documents with ids 
> 10 20 30 40 50, and this was split into shards 0..2 and 3..5 (under the 
> assumption that these shards would contain respectively 10 20 30, and 40 50), 
> and then suppose shard 10 20 30 is read and then document 25 is inserted - 
> then the 3..5 shard will read 30 40 50, i.e. document 30 is duplicated and 
> document 25 is lost.
> - Every shard re-executes the query and skips the first start_offset items, 
> which in total is quadratic complexity
> - The query is first executed in the constructor in order to count results, 
> which 1) means the constructor can be super slow and 2) it won't work at all 
> if the database is unavailable at the time the pipeline is constructed (e.g. 
> if this is a template).
> Unfortunately, none of these issues are caught by SourceTestUtils: this class 
> has extensive coverage with it, and the tests pass. This is because the tests 
> return the same results in the same order. I don't know how to catch this 
> automatically, and I don't know how to catch the performance issue 
> automatically, but these would all be important follow-up items after the 
> actual fix.
> CC: [~chamikara] as reviewer.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-7820) Add hot key detection to Dataflow Runner

2019-08-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7820?focusedWorklogId=291559=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291559
 ]

ASF GitHub Bot logged work on BEAM-7820:


Author: ASF GitHub Bot
Created on: 08/Aug/19 20:13
Start Date: 08/Aug/19 20:13
Worklog Time Spent: 10m 
  Work Description: pabloem commented on pull request #9270: [BEAM-7820] 
HotKeyDetection
URL: https://github.com/apache/beam/pull/9270
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 291559)
Time Spent: 3.5h  (was: 3h 20m)

> Add hot key detection to Dataflow Runner
> 
>
> Key: BEAM-7820
> URL: https://issues.apache.org/jira/browse/BEAM-7820
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-dataflow
>Reporter: Sam Rohde
>Assignee: Sam Rohde
>Priority: Minor
>  Time Spent: 3.5h
>  Remaining Estimate: 0h
>
> This tracks adding hot key detection in the Dataflow Runner. 
> There are times when a user's pipeline spuriously slows down due to hot keys. 
> During these times, users are unable to see under the hood at what the 
> pipeline is doing. This adds hot key detection to show the user when their 
> pipeline has a hot key.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues

2019-08-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=291555=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291555
 ]

ASF GitHub Bot logged work on BEAM-7866:


Author: ASF GitHub Bot
Created on: 08/Aug/19 20:05
Start Date: 08/Aug/19 20:05
Worklog Time Spent: 10m 
  Work Description: manuelaguilar commented on issue #9233:  [BEAM-7866] 
Fix python ReadFromMongoDB potential data loss issue
URL: https://github.com/apache/beam/pull/9233#issuecomment-519667754
 
 
   @y1chi  I have patched my forked 2.14.0 with the fixes in this ticket. What 
would be a good performance test to verify this fix?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 291555)
Time Spent: 10h 10m  (was: 10h)

> Python MongoDB IO performance and correctness issues
> 
>
> Key: BEAM-7866
> URL: https://issues.apache.org/jira/browse/BEAM-7866
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Eugene Kirpichov
>Assignee: Yichi Zhang
>Priority: Blocker
> Fix For: 2.15.0
>
>  Time Spent: 10h 10m
>  Remaining Estimate: 0h
>
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py
>  splits the query result by computing number of results in constructor, and 
> then in each reader re-executing the whole query and getting an index 
> sub-range of those results.
> This is broken in several critical ways:
> - The order of query results returned by find() is not necessarily 
> deterministic, so the idea of index ranges on it is meaningless: each shard 
> may basically get random, possibly overlapping subsets of the total results
> - Even if you add order by `_id`, the database may be changing concurrently 
> to reading and splitting. E.g. if the database contained documents with ids 
> 10 20 30 40 50, and this was split into shards 0..2 and 3..5 (under the 
> assumption that these shards would contain respectively 10 20 30, and 40 50), 
> and then suppose shard 10 20 30 is read and then document 25 is inserted - 
> then the 3..5 shard will read 30 40 50, i.e. document 30 is duplicated and 
> document 25 is lost.
> - Every shard re-executes the query and skips the first start_offset items, 
> which in total is quadratic complexity
> - The query is first executed in the constructor in order to count results, 
> which 1) means the constructor can be super slow and 2) it won't work at all 
> if the database is unavailable at the time the pipeline is constructed (e.g. 
> if this is a template).
> Unfortunately, none of these issues are caught by SourceTestUtils: this class 
> has extensive coverage with it, and the tests pass. This is because the tests 
> return the same results in the same order. I don't know how to catch this 
> automatically, and I don't know how to catch the performance issue 
> automatically, but these would all be important follow-up items after the 
> actual fix.
> CC: [~chamikara] as reviewer.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-7896) Rate estimation for Kafka Table

2019-08-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7896?focusedWorklogId=291553=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291553
 ]

ASF GitHub Bot logged work on BEAM-7896:


Author: ASF GitHub Bot
Created on: 08/Aug/19 20:04
Start Date: 08/Aug/19 20:04
Worklog Time Spent: 10m 
  Work Description: akedin commented on pull request #9298: [BEAM-7896] 
Implementing RateEstimation for KafkaTable 
URL: https://github.com/apache/beam/pull/9298#discussion_r312218957
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java
 ##
 @@ -138,6 +161,96 @@ public String getBootstrapServers() {
 
   @Override
   public BeamTableStatistics getTableStatistics(PipelineOptions options) {
-return BeamTableStatistics.UNBOUNDED_UNKNOWN;
+if (rowCountStatistics == null) {
+  try {
+rowCountStatistics =
+BeamTableStatistics.createUnboundedTableStatistics(
+this.computeRate(numberOfRecordsForRate));
+  } catch (Exception e) {
+LOGGER.warn("Could not get the row count for the topics " + 
getTopics(), e);
+rowCountStatistics = BeamTableStatistics.UNBOUNDED_UNKNOWN;
+  }
+}
+
+return rowCountStatistics;
+  }
+
+  /**
+   * This method returns the estimate of the computeRate for this table using 
last numberOfRecords
+   * tuples in each partition.
+   */
+  double computeRate(int numberOfRecords) throws NoEstimationException {
+Properties props = new Properties();
+
+props.put("bootstrap.servers", bootstrapServers);
+props.put("session.timeout.ms", "3");
+props.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
+props.put("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
+
+KafkaConsumer consumer = new KafkaConsumer(props);
+
+return computeRate(consumer, numberOfRecords);
+  }
+
+  double computeRate(Consumer consumer, int numberOfRecordsToCheck) throws 
NoEstimationException {
+
+List topars = new ArrayList<>();
+for (String name : getTopics()) {
+  List parInfList = consumer.partitionsFor(name);
+  topars.addAll(
+  parInfList.stream()
+  .map(parInf -> new TopicPartition(name, parInf.partition()))
+  .collect(Collectors.toList()));
+}
+
+consumer.assign(topars);
+Map offsets = consumer.endOffsets(topars);
+long nParsSeen = 0;
+for (TopicPartition par : topars) {
+  long offset = offsets.get(par);
+  nParsSeen = (offset == 0) ? nParsSeen : nParsSeen + 1;
+  consumer.seek(par, Math.max(0L, offset - numberOfRecordsToCheck));
+}
+
+if (nParsSeen == 0) {
+  throw new NoEstimationException("There is no partition with messages in 
it.");
+}
+
+ConsumerRecords records = consumer.poll(1000);
+
+Map minTimeStamps = new HashMap<>();
+long maxMinTimeStamp = 0;
+for (ConsumerRecord record : records) {
 
 Review comment:
   Comment this logic a bit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 291553)
Time Spent: 1.5h  (was: 1h 20m)

> Rate estimation for Kafka Table
> ---
>
> Key: BEAM-7896
> URL: https://issues.apache.org/jira/browse/BEAM-7896
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Alireza Samadianzakaria
>Assignee: Alireza Samadianzakaria
>Priority: Major
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> Currently, KafkaTable returns UNKNOWN statistics for its rate. 
> We can use previously arrived tuples to estimate the rate and return correct 
> statistics (See 
> [https://docs.google.com/document/d/1vi1PBBu5IqSy-qZl1Gk-49CcANOpbNs1UAud6LnOaiY|https://docs.google.com/document/d/1vi1PBBu5IqSy-qZl1Gk-49CcANOpbNs1UAud6LnOaiY/])
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-7896) Rate estimation for Kafka Table

2019-08-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7896?focusedWorklogId=291552=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291552
 ]

ASF GitHub Bot logged work on BEAM-7896:


Author: ASF GitHub Bot
Created on: 08/Aug/19 20:04
Start Date: 08/Aug/19 20:04
Worklog Time Spent: 10m 
  Work Description: akedin commented on pull request #9298: [BEAM-7896] 
Implementing RateEstimation for KafkaTable 
URL: https://github.com/apache/beam/pull/9298#discussion_r312215432
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java
 ##
 @@ -138,6 +161,96 @@ public String getBootstrapServers() {
 
   @Override
   public BeamTableStatistics getTableStatistics(PipelineOptions options) {
-return BeamTableStatistics.UNBOUNDED_UNKNOWN;
+if (rowCountStatistics == null) {
+  try {
+rowCountStatistics =
+BeamTableStatistics.createUnboundedTableStatistics(
+this.computeRate(numberOfRecordsForRate));
+  } catch (Exception e) {
+LOGGER.warn("Could not get the row count for the topics " + 
getTopics(), e);
+rowCountStatistics = BeamTableStatistics.UNBOUNDED_UNKNOWN;
+  }
+}
+
+return rowCountStatistics;
+  }
+
+  /**
+   * This method returns the estimate of the computeRate for this table using 
last numberOfRecords
+   * tuples in each partition.
+   */
+  double computeRate(int numberOfRecords) throws NoEstimationException {
+Properties props = new Properties();
+
+props.put("bootstrap.servers", bootstrapServers);
+props.put("session.timeout.ms", "3");
+props.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
+props.put("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
+
+KafkaConsumer consumer = new KafkaConsumer(props);
+
+return computeRate(consumer, numberOfRecords);
+  }
+
+  double computeRate(Consumer consumer, int numberOfRecordsToCheck) throws 
NoEstimationException {
+
+List topars = new ArrayList<>();
+for (String name : getTopics()) {
+  List parInfList = consumer.partitionsFor(name);
+  topars.addAll(
 
 Review comment:
   Is this different from the below code?
   
   ```java
   List topicPartitions = 
   getTopics()
  .map(topic -> consumer.partitionsFor(topic))
  .flatMap(partitions -> partitions.stream())
  .map(parInf -> new TopicPartition(name, parInf.partition())
  .collect(toList());
   ```
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 291552)
Time Spent: 1h 20m  (was: 1h 10m)

> Rate estimation for Kafka Table
> ---
>
> Key: BEAM-7896
> URL: https://issues.apache.org/jira/browse/BEAM-7896
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Alireza Samadianzakaria
>Assignee: Alireza Samadianzakaria
>Priority: Major
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> Currently, KafkaTable returns UNKNOWN statistics for its rate. 
> We can use previously arrived tuples to estimate the rate and return correct 
> statistics (See 
> [https://docs.google.com/document/d/1vi1PBBu5IqSy-qZl1Gk-49CcANOpbNs1UAud6LnOaiY|https://docs.google.com/document/d/1vi1PBBu5IqSy-qZl1Gk-49CcANOpbNs1UAud6LnOaiY/])
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-7896) Rate estimation for Kafka Table

2019-08-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7896?focusedWorklogId=291549=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291549
 ]

ASF GitHub Bot logged work on BEAM-7896:


Author: ASF GitHub Bot
Created on: 08/Aug/19 20:04
Start Date: 08/Aug/19 20:04
Worklog Time Spent: 10m 
  Work Description: akedin commented on pull request #9298: [BEAM-7896] 
Implementing RateEstimation for KafkaTable 
URL: https://github.com/apache/beam/pull/9298#discussion_r312218103
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaCSVTableIT.java
 ##
 @@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;
+
+import static org.apache.beam.sdk.schemas.Schema.FieldType.INT32;
+import static org.apache.beam.sdk.schemas.Schema.toSchema;
+
+import com.alibaba.fastjson.JSON;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.direct.DirectOptions;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.grpc.v1p21p0.com.google.common.base.MoreObjects;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * This is an integration test for KafkaCSVTable. There should be a kafka 
server running and the
+ * address should be passed to it.
+ */
+public class KafkaCSVTableIT {
+
+  @Rule public transient TestPipeline pipeline = TestPipeline.create();
+  //  @Rule public String unique_topic = "topic_"+System.currentTimeMillis();
+
+  private static final Schema TEST_TABLE_SCHEMA =
+  Schema.builder()
+  .addNullableField("order_id", Schema.FieldType.INT32)
+  .addNullableField("member_id", Schema.FieldType.INT32)
+  .addNullableField("item_name", Schema.FieldType.INT32)
+  .build();
+
+  @BeforeClass
+  public static void prepare() {
+PipelineOptionsFactory.register(KafkaOptions.class);
+  }
+
+  @Test
+  @SuppressWarnings("FutureReturnValueIgnored")
+  public void testFake2() throws BeamKafkaTable.NoEstimationException {
+KafkaOptions kafkaOptions = pipeline.getOptions().as(KafkaOptions.class);
+Table table =
+Table.builder()
+.name("kafka_table")
+.comment("kafka" + " table")
+.location("")
+.schema(
+Stream.of(
+Schema.Field.nullable("order_id", INT32),
+

[jira] [Work logged] (BEAM-7896) Rate estimation for Kafka Table

2019-08-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7896?focusedWorklogId=291548=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291548
 ]

ASF GitHub Bot logged work on BEAM-7896:


Author: ASF GitHub Bot
Created on: 08/Aug/19 20:04
Start Date: 08/Aug/19 20:04
Worklog Time Spent: 10m 
  Work Description: akedin commented on pull request #9298: [BEAM-7896] 
Implementing RateEstimation for KafkaTable 
URL: https://github.com/apache/beam/pull/9298#discussion_r312217848
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaCSVTableIT.java
 ##
 @@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;
+
+import static org.apache.beam.sdk.schemas.Schema.FieldType.INT32;
+import static org.apache.beam.sdk.schemas.Schema.toSchema;
+
+import com.alibaba.fastjson.JSON;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.direct.DirectOptions;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.grpc.v1p21p0.com.google.common.base.MoreObjects;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * This is an integration test for KafkaCSVTable. There should be a kafka 
server running and the
+ * address should be passed to it.
 
 Review comment:
   Create a jira and mention it here
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 291548)
Time Spent: 1h  (was: 50m)

> Rate estimation for Kafka Table
> ---
>
> Key: BEAM-7896
> URL: https://issues.apache.org/jira/browse/BEAM-7896
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Alireza Samadianzakaria
>Assignee: Alireza Samadianzakaria
>Priority: Major
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> Currently, KafkaTable returns UNKNOWN statistics for its rate. 
> We can use previously arrived tuples to estimate the rate and return correct 
> statistics (See 
> 

[jira] [Work logged] (BEAM-7896) Rate estimation for Kafka Table

2019-08-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7896?focusedWorklogId=291554=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291554
 ]

ASF GitHub Bot logged work on BEAM-7896:


Author: ASF GitHub Bot
Created on: 08/Aug/19 20:04
Start Date: 08/Aug/19 20:04
Worklog Time Spent: 10m 
  Work Description: akedin commented on pull request #9298: [BEAM-7896] 
Implementing RateEstimation for KafkaTable 
URL: https://github.com/apache/beam/pull/9298#discussion_r312219083
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java
 ##
 @@ -138,6 +161,96 @@ public String getBootstrapServers() {
 
   @Override
   public BeamTableStatistics getTableStatistics(PipelineOptions options) {
-return BeamTableStatistics.UNBOUNDED_UNKNOWN;
+if (rowCountStatistics == null) {
+  try {
+rowCountStatistics =
+BeamTableStatistics.createUnboundedTableStatistics(
+this.computeRate(numberOfRecordsForRate));
+  } catch (Exception e) {
+LOGGER.warn("Could not get the row count for the topics " + 
getTopics(), e);
+rowCountStatistics = BeamTableStatistics.UNBOUNDED_UNKNOWN;
+  }
+}
+
+return rowCountStatistics;
+  }
+
+  /**
+   * This method returns the estimate of the computeRate for this table using 
last numberOfRecords
+   * tuples in each partition.
+   */
+  double computeRate(int numberOfRecords) throws NoEstimationException {
+Properties props = new Properties();
+
+props.put("bootstrap.servers", bootstrapServers);
+props.put("session.timeout.ms", "3");
+props.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
+props.put("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
+
+KafkaConsumer consumer = new KafkaConsumer(props);
+
+return computeRate(consumer, numberOfRecords);
+  }
+
+  double computeRate(Consumer consumer, int numberOfRecordsToCheck) throws 
NoEstimationException {
+
+List topars = new ArrayList<>();
+for (String name : getTopics()) {
+  List parInfList = consumer.partitionsFor(name);
+  topars.addAll(
+  parInfList.stream()
+  .map(parInf -> new TopicPartition(name, parInf.partition()))
+  .collect(Collectors.toList()));
+}
+
+consumer.assign(topars);
+Map offsets = consumer.endOffsets(topars);
+long nParsSeen = 0;
+for (TopicPartition par : topars) {
+  long offset = offsets.get(par);
+  nParsSeen = (offset == 0) ? nParsSeen : nParsSeen + 1;
+  consumer.seek(par, Math.max(0L, offset - numberOfRecordsToCheck));
+}
+
+if (nParsSeen == 0) {
+  throw new NoEstimationException("There is no partition with messages in 
it.");
+}
+
+ConsumerRecords records = consumer.poll(1000);
+
+Map minTimeStamps = new HashMap<>();
+long maxMinTimeStamp = 0;
+for (ConsumerRecord record : records) {
+  if (!minTimeStamps.containsKey(record.partition())) {
+minTimeStamps.put(record.partition(), record.timestamp());
+
+nParsSeen--;
+maxMinTimeStamp = Math.max(record.timestamp(), maxMinTimeStamp);
+if (nParsSeen == 0) {
+  break;
+}
+  }
+}
+
+int numberOfRecords = 0;
+long maxTimeStamp = 0;
+for (ConsumerRecord record : records) {
+  maxTimeStamp = Math.max(maxTimeStamp, record.timestamp());
+  numberOfRecords =
+  record.timestamp() > maxMinTimeStamp ? numberOfRecords + 1 : 
numberOfRecords;
+}
+
+if (maxTimeStamp == maxMinTimeStamp) {
+  throw new NoEstimationException("Arrival time of all records are the 
same.");
+}
+
+return (numberOfRecords * 1000.) / ((double) maxTimeStamp - 
maxMinTimeStamp);
+  }
+
+  /** Will be thrown if we cannot estimate the rate for kafka table. */
+  public static class NoEstimationException extends Exception {
 
 Review comment:
   Can be package private?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 291554)
Time Spent: 1h 40m  (was: 1.5h)

> Rate estimation for Kafka Table
> ---
>
> Key: BEAM-7896
> URL: https://issues.apache.org/jira/browse/BEAM-7896
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Alireza Samadianzakaria
>Assignee: Alireza Samadianzakaria
>Priority: Major
>  Time Spent: 1h 40m
>  

[jira] [Comment Edited] (BEAM-1251) Python 3 Support

2019-08-08 Thread Valentyn Tymofieiev (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-1251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16890504#comment-16890504
 ] 

Valentyn Tymofieiev edited comment on BEAM-1251 at 8/8/19 8:04 PM:
---

There have been several Python 3-related improvements to Beam since 2.11.0 
release. Notable changes:

2.12.0:
* Pre and Post-commit test coverage expanded to Python 3.5, 3.6, 3.7.
* Direct and Dataflow runners added support for Python 3.6 - 3.7.

2.13.0:
* Avro IO support enabled on Python 3.
* Datastore IO support enabled on Python 3.
* Bigquery IO support for BYTES datatype enabled on Python 3.

2.14.0 (release in progress): 
* Portable runner supports launching pipelines on Python 3 with Flink/Spark.
* Fixed handling of empty values of BYTES columns in Bigquery IO on Python 3. 
* Strengthened type inference on Python 3.6+. More restrictions defined by type 
annotations are now enforced.

Starting from 2.14.0, Beam will announce support of Python 3.6, 3.7 in 
[PyPi|https://pypi.org/project/apache-beam/]. 

Work continues to address known issues and strengthen Beam's Python 3 offering, 
in particular:
* Improve type annotations and inference on Python 3: BEAM-7060, BEAM-7712, 
BEAM-7713.
* Mitigate known pickling errors on Python 3: BEAM-6158, BEAM-7540. 
* Support syntactic constructs of Python 3 that are not yet supported by Beam: 
BEAM-5878. 

Contributions (and feedback!) are welcome, see: 
https://beam.apache.org/roadmap/python-sdk/#python-3-support for details.



was (Author: tvalentyn):
There have been several Python 3-related improvements to Beam since 2.11.0 
release. Notable changes:

2.12.0:
* Pre and Post-commit test coverage expanded to Python 3.5, 3.6, 3.7.
* Direct and Dataflow runners added support for Python 3.6 - 3.7.

2.13.0:
* Avro IO support enabled on Python 3.
* Datastore IO support enabled on Python 3.
* Bigquery IO support for BYTES datatype enabled on Python 3.

2.14.0 (release in progress): 
* Portable runner supports launching pipelines on Python 3 with Flink/Spark.
* Fixed handling of empty values of BYTES columns in Bigquery IO on Python 3. 
* Strengthened type inference on Python 3.6+. More restrictions defined by type 
annotations are now enforced.

Starting from 2.14.0, Beam will announce support of Python 3.6, 3.7 in 
[PyPi|https://pypi.org/project/apache-beam/]. 

Work continues to address known issues and strengthen Beam's Python 3 offering, 
in particular:
* Improve type annotations and inference on Python 3: BEAM-7060, BEAM-7712, 
BEAM-7713.
* Mitigate known pickling errors on Python 3: BEAM-7540. 
* Support syntactic constructs of Python 3 that are not yet supported by Beam: 
BEAM-5878. 

Contributions (and feedback!) are welcome, see: 
https://beam.apache.org/roadmap/python-sdk/#python-3-support for details.


> Python 3 Support
> 
>
> Key: BEAM-1251
> URL: https://issues.apache.org/jira/browse/BEAM-1251
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-core
>Reporter: Eyad Sibai
>Assignee: Valentyn Tymofieiev
>Priority: Major
>  Time Spent: 29h 20m
>  Remaining Estimate: 0h
>
> I have been trying to use google datalab with python3. As I see there are 
> several packages that does not support python3 yet which google datalab 
> depends on. This is one of them.
> https://github.com/GoogleCloudPlatform/DataflowPythonSDK/issues/6



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-7896) Rate estimation for Kafka Table

2019-08-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7896?focusedWorklogId=291547=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291547
 ]

ASF GitHub Bot logged work on BEAM-7896:


Author: ASF GitHub Bot
Created on: 08/Aug/19 20:04
Start Date: 08/Aug/19 20:04
Worklog Time Spent: 10m 
  Work Description: akedin commented on pull request #9298: [BEAM-7896] 
Implementing RateEstimation for KafkaTable 
URL: https://github.com/apache/beam/pull/9298#discussion_r312216682
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java
 ##
 @@ -138,6 +161,96 @@ public String getBootstrapServers() {
 
   @Override
   public BeamTableStatistics getTableStatistics(PipelineOptions options) {
-return BeamTableStatistics.UNBOUNDED_UNKNOWN;
+if (rowCountStatistics == null) {
+  try {
+rowCountStatistics =
+BeamTableStatistics.createUnboundedTableStatistics(
+this.computeRate(numberOfRecordsForRate));
+  } catch (Exception e) {
+LOGGER.warn("Could not get the row count for the topics " + 
getTopics(), e);
+rowCountStatistics = BeamTableStatistics.UNBOUNDED_UNKNOWN;
+  }
+}
+
+return rowCountStatistics;
+  }
+
+  /**
+   * This method returns the estimate of the computeRate for this table using 
last numberOfRecords
+   * tuples in each partition.
+   */
+  double computeRate(int numberOfRecords) throws NoEstimationException {
+Properties props = new Properties();
+
+props.put("bootstrap.servers", bootstrapServers);
+props.put("session.timeout.ms", "3");
+props.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
+props.put("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
+
+KafkaConsumer consumer = new KafkaConsumer(props);
+
+return computeRate(consumer, numberOfRecords);
+  }
+
+  double computeRate(Consumer consumer, int numberOfRecordsToCheck) throws 
NoEstimationException {
+
+List topars = new ArrayList<>();
+for (String name : getTopics()) {
+  List parInfList = consumer.partitionsFor(name);
+  topars.addAll(
+  parInfList.stream()
+  .map(parInf -> new TopicPartition(name, parInf.partition()))
+  .collect(Collectors.toList()));
+}
+
+consumer.assign(topars);
+Map offsets = consumer.endOffsets(topars);
 
 Review comment:
   Add a comment what happens with the consumer and topic, what are the offsets
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 291547)
Time Spent: 50m  (was: 40m)

> Rate estimation for Kafka Table
> ---
>
> Key: BEAM-7896
> URL: https://issues.apache.org/jira/browse/BEAM-7896
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Alireza Samadianzakaria
>Assignee: Alireza Samadianzakaria
>Priority: Major
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> Currently, KafkaTable returns UNKNOWN statistics for its rate. 
> We can use previously arrived tuples to estimate the rate and return correct 
> statistics (See 
> [https://docs.google.com/document/d/1vi1PBBu5IqSy-qZl1Gk-49CcANOpbNs1UAud6LnOaiY|https://docs.google.com/document/d/1vi1PBBu5IqSy-qZl1Gk-49CcANOpbNs1UAud6LnOaiY/])
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-7896) Rate estimation for Kafka Table

2019-08-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7896?focusedWorklogId=291551=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291551
 ]

ASF GitHub Bot logged work on BEAM-7896:


Author: ASF GitHub Bot
Created on: 08/Aug/19 20:04
Start Date: 08/Aug/19 20:04
Worklog Time Spent: 10m 
  Work Description: akedin commented on pull request #9298: [BEAM-7896] 
Implementing RateEstimation for KafkaTable 
URL: https://github.com/apache/beam/pull/9298#discussion_r312217900
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaCSVTableIT.java
 ##
 @@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;
+
+import static org.apache.beam.sdk.schemas.Schema.FieldType.INT32;
+import static org.apache.beam.sdk.schemas.Schema.toSchema;
+
+import com.alibaba.fastjson.JSON;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.direct.DirectOptions;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.grpc.v1p21p0.com.google.common.base.MoreObjects;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * This is an integration test for KafkaCSVTable. There should be a kafka 
server running and the
+ * address should be passed to it.
+ */
+public class KafkaCSVTableIT {
+
+  @Rule public transient TestPipeline pipeline = TestPipeline.create();
+  //  @Rule public String unique_topic = "topic_"+System.currentTimeMillis();
 
 Review comment:
   remove
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 291551)
Time Spent: 1h 20m  (was: 1h 10m)

> Rate estimation for Kafka Table
> ---
>
> Key: BEAM-7896
> URL: https://issues.apache.org/jira/browse/BEAM-7896
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Alireza Samadianzakaria
>Assignee: Alireza Samadianzakaria
>Priority: Major
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> Currently, 

[jira] [Work logged] (BEAM-7896) Rate estimation for Kafka Table

2019-08-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7896?focusedWorklogId=291550=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291550
 ]

ASF GitHub Bot logged work on BEAM-7896:


Author: ASF GitHub Bot
Created on: 08/Aug/19 20:04
Start Date: 08/Aug/19 20:04
Worklog Time Spent: 10m 
  Work Description: akedin commented on pull request #9298: [BEAM-7896] 
Implementing RateEstimation for KafkaTable 
URL: https://github.com/apache/beam/pull/9298#discussion_r312218335
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaCSVTableIT.java
 ##
 @@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;
+
+import static org.apache.beam.sdk.schemas.Schema.FieldType.INT32;
+import static org.apache.beam.sdk.schemas.Schema.toSchema;
+
+import com.alibaba.fastjson.JSON;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.direct.DirectOptions;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.grpc.v1p21p0.com.google.common.base.MoreObjects;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * This is an integration test for KafkaCSVTable. There should be a kafka 
server running and the
+ * address should be passed to it.
+ */
+public class KafkaCSVTableIT {
+
+  @Rule public transient TestPipeline pipeline = TestPipeline.create();
+  //  @Rule public String unique_topic = "topic_"+System.currentTimeMillis();
+
+  private static final Schema TEST_TABLE_SCHEMA =
+  Schema.builder()
+  .addNullableField("order_id", Schema.FieldType.INT32)
+  .addNullableField("member_id", Schema.FieldType.INT32)
+  .addNullableField("item_name", Schema.FieldType.INT32)
+  .build();
+
+  @BeforeClass
+  public static void prepare() {
+PipelineOptionsFactory.register(KafkaOptions.class);
+  }
+
+  @Test
+  @SuppressWarnings("FutureReturnValueIgnored")
+  public void testFake2() throws BeamKafkaTable.NoEstimationException {
+KafkaOptions kafkaOptions = pipeline.getOptions().as(KafkaOptions.class);
+Table table =
+Table.builder()
+.name("kafka_table")
+.comment("kafka" + " table")
+.location("")
+.schema(
+Stream.of(
+Schema.Field.nullable("order_id", INT32),
+

[jira] [Updated] (BEAM-7930) bundle_processor log spam using python SDK on dataflow runner

2019-08-08 Thread James Hutchison (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7930?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

James Hutchison updated BEAM-7930:
--
Description: 
When running my pipeline on dataflow, I can see in the stackdriver logs a large 
amount of spam for the following messages (note that the numbers in them change 
every message):
 * [INFO] (bundle_processor.create_operation) No unique name set for transform 
generatedPtransform-67
 * [INFO] (bundle_processor.create_operation) No unique name for transform -19
 * [ERROR] (bundle_processor.create) Missing required coder_id on grpc_port for 
-19; using deprecated fallback.

I tried running locally using the debugger and setting breakpoints on where 
these log messages originate using the direct runner and it never hit it, so I 
don't know specifically what is causing them.

I also tried using the logging module to change the threshold and also mocked 
out the logging attribute in the bundle_processor module to change the log 
level to CRITICAL and I still see the log messages.

The pipeline is a streaming pipeline that reads from two pubsub topics, merges 
the inputs and runs distinct on the inputs over each processing time window, 
fetches from an external service, does processing, and inserts into 
elasticsearch with failures going into bigquery. I notice the log messages seem 
to cluster and this appears early on before any other log messages in any of 
the other steps so I wonder if maybe this is coming from the pubsub read or 
windowing portion.

Expected behavior:
 * I don't expect to see these noisy log messages which seem to indicate 
something is wrong
 * The missing required coder_id message is at the ERROR log level so it 
pollutes the error logs. I would expect this to be at the WARNING or INFO level.

  was:
When running my pipeline on dataflow, I can see in the stackdriver logs a large 
amount of spam for the following messages (note that the numbers in them change 
every message):
 * [INFO] (bundle_processor.create_operation) No unique name set for transform 
generatedPtransform-67
 * [INFO] (bundle_processor.create_operation) No unique name for transform -19
 * [ERROR] (bundle_processor.create) Missing required coder_id on grpc_port for 
-19; using deprecated fallback.

I tried using a breakpoint on where these log messages originate using the 
direct runner and it never hit it, so I don't know specifically what is causing 
them.

I also tried using the logging module to change the threshold and also mocked 
out the logging attribute in the bundle_processor module to change the log 
level to CRITICAL and I still see the log messages.

The pipeline is a streaming pipeline that reads from two pubsub topics, merges 
the inputs and runs distinct on the inputs over each processing time window, 
fetches from an external service, does processing, and inserts into 
elasticsearch with failures going into bigquery. I notice the log messages seem 
to cluster and this appears early on before any other log messages in any of 
the other steps so I wonder if maybe this is coming from the pubsub read or 
windowing portion.

Expected behavior:
 * I don't expect to see these noisy log messages which seem to indicate 
something is wrong
 * The missing required coder_id message is at the ERROR log level so it 
pollutes the error logs. I would expect this to be at the WARNING or INFO level.


> bundle_processor log spam using python SDK on dataflow runner
> -
>
> Key: BEAM-7930
> URL: https://issues.apache.org/jira/browse/BEAM-7930
> Project: Beam
>  Issue Type: Bug
>  Components: runner-dataflow
>Affects Versions: 2.13.0
>Reporter: James Hutchison
>Priority: Minor
>
> When running my pipeline on dataflow, I can see in the stackdriver logs a 
> large amount of spam for the following messages (note that the numbers in 
> them change every message):
>  * [INFO] (bundle_processor.create_operation) No unique name set for 
> transform generatedPtransform-67
>  * [INFO] (bundle_processor.create_operation) No unique name for transform -19
>  * [ERROR] (bundle_processor.create) Missing required coder_id on grpc_port 
> for -19; using deprecated fallback.
> I tried running locally using the debugger and setting breakpoints on where 
> these log messages originate using the direct runner and it never hit it, so 
> I don't know specifically what is causing them.
> I also tried using the logging module to change the threshold and also mocked 
> out the logging attribute in the bundle_processor module to change the log 
> level to CRITICAL and I still see the log messages.
> The pipeline is a streaming pipeline that reads from two pubsub topics, 
> merges the inputs and runs distinct on the inputs over each processing time 
> window, fetches from an external service, 

[jira] [Work logged] (BEAM-6907) Standardize Gradle projects/tasks structure for Python SDK

2019-08-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6907?focusedWorklogId=291540=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291540
 ]

ASF GitHub Bot logged work on BEAM-6907:


Author: ASF GitHub Bot
Created on: 08/Aug/19 19:48
Start Date: 08/Aug/19 19:48
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on pull request #9277: [BEAM-6907] 
Reuse Python tarball in tox & dataflow integration tests
URL: https://github.com/apache/beam/pull/9277
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 291540)
Time Spent: 4.5h  (was: 4h 20m)

> Standardize Gradle projects/tasks structure for Python SDK
> --
>
> Key: BEAM-6907
> URL: https://issues.apache.org/jira/browse/BEAM-6907
> Project: Beam
>  Issue Type: Task
>  Components: build-system
>Reporter: Mark Liu
>Assignee: Mark Liu
>Priority: Major
>  Time Spent: 4.5h
>  Remaining Estimate: 0h
>
> As Gradle parallelism applied to Python tests and more python versions added 
> to tests, the way Gradle manages projects/tasks changed a lot. Frictions are 
> generated during Gradle refactor since some projects defined separate build 
> script under source directory. Thus, It will be better to standardize how we 
> use Gradle. This will help to manage Python tests/builds/tasks across 
> different versions and runners, and also easy for people to learn/use/develop.
> In general, we may want to:
> - Apply parallel execution
> - Share common tasks
> - Centralize test related tasks
> - Have a clear Gradle structure for projects/tasks



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-7896) Rate estimation for Kafka Table

2019-08-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7896?focusedWorklogId=291539=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291539
 ]

ASF GitHub Bot logged work on BEAM-7896:


Author: ASF GitHub Bot
Created on: 08/Aug/19 19:43
Start Date: 08/Aug/19 19:43
Worklog Time Spent: 10m 
  Work Description: riazela commented on issue #9298: [BEAM-7896] 
Implementing RateEstimation for KafkaTable 
URL: https://github.com/apache/beam/pull/9298#issuecomment-519660611
 
 
   run sql postcommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 291539)
Time Spent: 40m  (was: 0.5h)

> Rate estimation for Kafka Table
> ---
>
> Key: BEAM-7896
> URL: https://issues.apache.org/jira/browse/BEAM-7896
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Alireza Samadianzakaria
>Assignee: Alireza Samadianzakaria
>Priority: Major
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> Currently, KafkaTable returns UNKNOWN statistics for its rate. 
> We can use previously arrived tuples to estimate the rate and return correct 
> statistics (See 
> [https://docs.google.com/document/d/1vi1PBBu5IqSy-qZl1Gk-49CcANOpbNs1UAud6LnOaiY|https://docs.google.com/document/d/1vi1PBBu5IqSy-qZl1Gk-49CcANOpbNs1UAud6LnOaiY/])
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-7802) Expose a method to make an Avro-based PCollection into an Schema-based one

2019-08-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7802?focusedWorklogId=291538=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291538
 ]

ASF GitHub Bot logged work on BEAM-7802:


Author: ASF GitHub Bot
Created on: 08/Aug/19 19:42
Start Date: 08/Aug/19 19:42
Worklog Time Spent: 10m 
  Work Description: iemejia commented on issue #9130: [BEAM-7802] Expose a 
method to make an Avro-based PCollection into an Schema-based one
URL: https://github.com/apache/beam/pull/9130#issuecomment-518675242
 
 
   I was starting to think if we should somehow make every AvroCoder 
PCollection a SchemaCoder for user friendliness, but I have my doubts on taking 
this 'implicit' approach, I am a bit worried about breaking backwards 
compatibility, but somehow it makes sense too, mmm... hard to decide/know.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 291538)
Time Spent: 2h 20m  (was: 2h 10m)

> Expose a method to make an Avro-based PCollection into an Schema-based one
> --
>
> Key: BEAM-7802
> URL: https://issues.apache.org/jira/browse/BEAM-7802
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Ismaël Mejía
>Assignee: Ismaël Mejía
>Priority: Minor
>  Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> Avro can infer the Schema for an Avro based PCollection by using the 
> `withBeamSchemas` method, however if the user created a PCollection with Avro 
> objects or IndexedRecord/GenericRecord, he needs to manually set the schema 
> (or coder). The idea is to expose a method in schema.AvroUtils to ease this.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-7776) Implement Kubernetes setup/teardown code to gradle/jenkins tasks.

2019-08-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7776?focusedWorklogId=291522=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291522
 ]

ASF GitHub Bot logged work on BEAM-7776:


Author: ASF GitHub Bot
Created on: 08/Aug/19 19:41
Start Date: 08/Aug/19 19:41
Worklog Time Spent: 10m 
  Work Description: iemejia commented on pull request #9116: [BEAM-7776] 
Stop Using Perfkit in IOIT
URL: https://github.com/apache/beam/pull/9116
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 291522)
Time Spent: 11h 10m  (was: 11h)

> Implement Kubernetes setup/teardown code to gradle/jenkins tasks. 
> --
>
> Key: BEAM-7776
> URL: https://issues.apache.org/jira/browse/BEAM-7776
> Project: Beam
>  Issue Type: Sub-task
>  Components: testing
>Reporter: Lukasz Gajowy
>Assignee: Lukasz Gajowy
>Priority: Major
>  Time Spent: 11h 10m
>  Remaining Estimate: 0h
>
> Currently this is done by Perfkit Benchmarker but can be easily moved to 
> Beam's codebase and a set of fine-grained gradle tasks. Those could be then 
> invoked by Jenkins giving more elasticity to our tests and making Perkfit 
> totally obsolete in IOITs. 



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Issue Comment Deleted] (BEAM-7931) Fix gradlew to allow offline builds

2019-08-08 Thread Daniel Gazineu (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7931?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Gazineu updated BEAM-7931:
-
Comment: was deleted

(was: This is an issue with the way Gradle Wrapper works 
[https://github.com/gradle/gradle/issues/4463]. 

A stopgap solution is to manually override distributionUrl on 
gradle/wrapper/gradle-wrapper.properties. You can replace it with a URL within 
your corporate network or even a file in your filesystem. i.e:


{code:java}
sed -i 
's/https\\:\/\/services.gradle.org\/distributions/file\\:\/\/\/home\/username\/gradle-cache/g'
 gradle/wrapper/gradle-wrapper.properties
{code}
 )

> Fix gradlew to allow offline builds
> ---
>
> Key: BEAM-7931
> URL: https://issues.apache.org/jira/browse/BEAM-7931
> Project: Beam
>  Issue Type: Improvement
>  Components: build-system
>Reporter: Chad Dombrova
>Priority: Major
>
> When running `./gradlew` the first thing it tries to do is download gradle:
> {noformat}
> Downloading https://services.gradle.org/distributions/gradle-5.2.1-all.zip
> {noformat}
> There seems to be no way to skip this, even if the correct version of gradle 
> has already been downloaded and exists in the correct place.  The tool should 
> be smart enough to do a version check and skip downloading if it exists.  
> Unfortunately, the logic is wrapped up inside 
> gradle/wrapper/gradle-wrapper.jar so there does not seem to be any way to fix 
> this.  Where is the code for this jar?
> This is the first step of several to allow beam to be built offline.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Resolved] (BEAM-7877) Change the log level when deleting unknown temporary files in FileBasedSink

2019-08-08 Thread Heejong Lee (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Heejong Lee resolved BEAM-7877.
---
   Resolution: Fixed
Fix Version/s: 2.15.0

> Change the log level when deleting unknown temporary files in FileBasedSink
> ---
>
> Key: BEAM-7877
> URL: https://issues.apache.org/jira/browse/BEAM-7877
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-files
>Reporter: Heejong Lee
>Assignee: Heejong Lee
>Priority: Major
> Fix For: 2.15.0
>
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> Currently the log level is info. A new proposed log level is warning since 
> deleting unknown temporary files is a bad sign and sometimes leads to data 
> loss.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Resolved] (BEAM-7744) LTS backport: Temporary directory for WriteOperation may not be unique in FileBaseSink

2019-08-08 Thread Heejong Lee (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7744?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Heejong Lee resolved BEAM-7744.
---
Resolution: Fixed

> LTS backport: Temporary directory for WriteOperation may not be unique in 
> FileBaseSink
> --
>
> Key: BEAM-7744
> URL: https://issues.apache.org/jira/browse/BEAM-7744
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-files
>Reporter: Heejong Lee
>Assignee: Heejong Lee
>Priority: Major
> Fix For: 2.7.1
>
>  Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> Tracking BEAM-7689 LTS backport for 2.7.1 release



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Resolved] (BEAM-6683) Add an integration test suite for cross-language transforms for Flink runner

2019-08-08 Thread Heejong Lee (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6683?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Heejong Lee resolved BEAM-6683.
---
   Resolution: Fixed
Fix Version/s: 2.15.0

> Add an integration test suite for cross-language transforms for Flink runner
> 
>
> Key: BEAM-6683
> URL: https://issues.apache.org/jira/browse/BEAM-6683
> Project: Beam
>  Issue Type: Test
>  Components: testing
>Reporter: Chamikara Jayalath
>Assignee: Heejong Lee
>Priority: Major
> Fix For: 2.15.0
>
>  Time Spent: 23.5h
>  Remaining Estimate: 0h
>
> We should add an integration test suite that covers following.
> (1) Currently available Java IO connectors that do not use UDFs work for 
> Python SDK on Flink runner.
> (2) Currently available Python IO connectors that do not use UDFs work for 
> Java SDK on Flink runner.
> (3) Currently available Java/Python pipelines work in a scalable manner for 
> cross-language pipelines (for example, try 10GB, 100GB input for 
> textio/avroio for Java and Python). 
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Closed] (BEAM-7915) show cross-language validate runner Flink badge on github PR template

2019-08-08 Thread Heejong Lee (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Heejong Lee closed BEAM-7915.
-
   Resolution: Fixed
Fix Version/s: Not applicable

> show cross-language validate runner Flink badge on github PR template
> -
>
> Key: BEAM-7915
> URL: https://issues.apache.org/jira/browse/BEAM-7915
> Project: Beam
>  Issue Type: Improvement
>  Components: website
>Reporter: Heejong Lee
>Assignee: Heejong Lee
>Priority: Major
> Fix For: Not applicable
>
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> show cross-language validate runner Flink badge on github template



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Closed] (BEAM-7924) Failure in Python 2 postcommit: crossLanguagePythonJavaFlink

2019-08-08 Thread Heejong Lee (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Heejong Lee closed BEAM-7924.
-
   Resolution: Fixed
Fix Version/s: 2.15.0

> Failure in Python 2 postcommit: crossLanguagePythonJavaFlink
> 
>
> Key: BEAM-7924
> URL: https://issues.apache.org/jira/browse/BEAM-7924
> Project: Beam
>  Issue Type: Bug
>  Components: test-failures
>Reporter: Udi Meiri
>Assignee: Heejong Lee
>Priority: Major
> Fix For: 2.15.0
>
>  Time Spent: 3.5h
>  Remaining Estimate: 0h
>
> This seems to be the root cause:
> {code}
> 11:32:59 [grpc-default-executor-1] WARN pipeline_options.get_all_options - 
> Discarding unparseable args: [u'--app_name=None', 
> u'--shutdown_sources_on_final_watermark', u'--flink_master=[auto]', 
> u'--direct_runner_use_stacked_bundle', u'--options_id=1', 
> u'--fail_on_checkpointing_errors', u'--enable_metrics', 
> u'--pipeline_type_check', u'--parallelism=2'] 
> 11:32:59 [grpc-default-executor-1] INFO sdk_worker_main.main - Python sdk 
> harness started with pipeline_options: {'runner': u'None', 'experiments': 
> [u'worker_threads=100', u'beam_fn_api'], 'environment_cache_millis': 
> u'1', 'sdk_location': u'container', 'job_name': 
> u'BeamApp-root-0807183253-57a72c22', 'save_main_session': True, 'region': 
> u'us-central1', 'sdk_worker_parallelism': u'1'}
> 11:32:59 [grpc-default-executor-1] ERROR sdk_worker_main.main - Python sdk 
> harness failed: 
> 11:32:59 Traceback (most recent call last):
> 11:32:59   File 
> "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker_main.py",
>  line 153, in main
> 11:32:59 sdk_pipeline_options.view_as(pipeline_options.ProfilingOptions))
> 11:32:59   File 
> "/usr/local/lib/python2.7/site-packages/apache_beam/options/pipeline_options.py",
>  line 334, in __getattr__
> 11:32:59 (type(self).__name__, name))
> 11:32:59 AttributeError: 'PipelineOptions' object has no attribute 
> 'ProfilingOptions' 
> {code}
> https://builds.apache.org/job/beam_PostCommit_Python2_PR/58/console



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-7819) PubsubMessage message parsing is lacking non-attribute fields

2019-08-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7819?focusedWorklogId=291511=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291511
 ]

ASF GitHub Bot logged work on BEAM-7819:


Author: ASF GitHub Bot
Created on: 08/Aug/19 19:12
Start Date: 08/Aug/19 19:12
Worklog Time Spent: 10m 
  Work Description: matt-darwin commented on issue #9232: [BEAM-7819] 
Python - parse PubSub message_id into attributes property
URL: https://github.com/apache/beam/pull/9232#issuecomment-519650640
 
 
   Run Python Dataflow ValidatesRunner
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 291511)
Time Spent: 3h 10m  (was: 3h)

> PubsubMessage message parsing is lacking non-attribute fields
> -
>
> Key: BEAM-7819
> URL: https://issues.apache.org/jira/browse/BEAM-7819
> Project: Beam
>  Issue Type: Bug
>  Components: io-py-gcp
>Reporter: Ahmet Altay
>Assignee: Udi Meiri
>Priority: Major
>  Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> User reported issue: 
> https://lists.apache.org/thread.html/139b0c15abc6471a2e2202d76d915c645a529a23ecc32cd9cfecd315@%3Cuser.beam.apache.org%3E
> """
> Looking at the source code, with my untrained python eyes, I think if the 
> intention is to include the message id and the publish time in the attributes 
> attribute of the PubSubMessage type, then the protobuf mapping is missing 
> something:-
> @staticmethod
> def _from_proto_str(proto_msg):
> """Construct from serialized form of ``PubsubMessage``.
> Args:
> proto_msg: String containing a serialized protobuf of type
> https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.PubsubMessage
> Returns:
> A new PubsubMessage object.
> """
> msg = pubsub.types.pubsub_pb2.PubsubMessage()
> msg.ParseFromString(proto_msg)
> # Convert ScalarMapContainer to dict.
> attributes = dict((key, msg.attributes[key]) for key in msg.attributes)
> return PubsubMessage(msg.data, attributes)
> The protobuf definition is here:-
> https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.PubsubMessage
> and so it looks as if the message_id and publish_time are not being parsed as 
> they are seperate from the attributes. Perhaps the PubsubMessage class needs 
> expanding to include these as attributes, or they would need adding to the 
> dictionary for attributes. This would only need doing for the _from_proto_str 
> as obviously they would not need to be populated when transmitting a message 
> to PubSub.
> My python is not great, I'm assuming the latter option would need to look 
> something like this?
> attributes = dict((key, msg.attributes[key]) for key in msg.attributes)
> attributes.update({'message_id': msg.message_id, 'publish_time': 
> msg.publish_time})
> return PubsubMessage(msg.data, attributes)
> """



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (BEAM-7931) Fix gradlew to allow offline builds

2019-08-08 Thread Daniel Gazineu (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-7931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16903246#comment-16903246
 ] 

Daniel Gazineu commented on BEAM-7931:
--

This is an issue with the way Gradle Wrapper works 
[https://github.com/gradle/gradle/issues/4463]. 

A stopgap solution is to manually override distributionUrl on 
gradle/wrapper/gradle-wrapper.properties. You can replace it with a URL within 
your corporate network or even a file in your filesystem. i.e:


{code:java}
sed -i 
's/https\\:\/\/services.gradle.org\/distributions/file\\:\/\/\/home\/username\/gradle-cache/g'
 gradle/wrapper/gradle-wrapper.properties
{code}
 

> Fix gradlew to allow offline builds
> ---
>
> Key: BEAM-7931
> URL: https://issues.apache.org/jira/browse/BEAM-7931
> Project: Beam
>  Issue Type: Improvement
>  Components: build-system
>Reporter: Chad Dombrova
>Priority: Major
>
> When running `./gradlew` the first thing it tries to do is download gradle:
> {noformat}
> Downloading https://services.gradle.org/distributions/gradle-5.2.1-all.zip
> {noformat}
> There seems to be no way to skip this, even if the correct version of gradle 
> has already been downloaded and exists in the correct place.  The tool should 
> be smart enough to do a version check and skip downloading if it exists.  
> Unfortunately, the logic is wrapped up inside 
> gradle/wrapper/gradle-wrapper.jar so there does not seem to be any way to fix 
> this.  Where is the code for this jar?
> This is the first step of several to allow beam to be built offline.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues

2019-08-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=291507=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291507
 ]

ASF GitHub Bot logged work on BEAM-7866:


Author: ASF GitHub Bot
Created on: 08/Aug/19 18:56
Start Date: 08/Aug/19 18:56
Worklog Time Spent: 10m 
  Work Description: y1chi commented on pull request #9233:  [BEAM-7866] Fix 
python ReadFromMongoDB potential data loss issue
URL: https://github.com/apache/beam/pull/9233#discussion_r312194199
 
 

 ##
 File path: sdks/python/apache_beam/io/mongodbio_test.py
 ##
 @@ -210,6 +289,47 @@ def test_write_to_mongodb_with_generated_id(self, 
mock_client):
 return_value.bulk_write.assert_called_with(expected_update)
 
 
+class ObjectIdHelperTest(TestCase):
+  def test_conversion(self):
+test_cases = [
+(objectid.ObjectId(''), 0),
+(objectid.ObjectId('0001'), 2**32),
+(objectid.ObjectId(''), 2**32 - 1),
+(objectid.ObjectId('0001'), 2**64),
+(objectid.ObjectId(''), 2**64 - 1),
+(objectid.ObjectId(''), 2**96 - 1),
+]
+for (id, number) in test_cases:
+  self.assertEqual(id, _ObjectIdHelper.int_to_id(number))
+  self.assertEqual(number, _ObjectIdHelper.id_to_int(id))
+
+# random tests
+for _ in range(100):
+  id = objectid.ObjectId()
+  if sys.version_info[0] < 3:
+number = int(id.binary.encode('hex'), 16)
+  else:  # PY3
+number = int(id.binary.hex(), 16)
+  self.assertEqual(id, _ObjectIdHelper.int_to_id(number))
+  self.assertEqual(number, _ObjectIdHelper.id_to_int(id))
+
+  def test_increment_id(self):
+test_cases = [
+(objectid.ObjectId('0001'),
+ objectid.ObjectId('')),
+(objectid.ObjectId('0001'),
+ objectid.ObjectId('')),
+]
+for (first, second) in test_cases:
+  self.assertEqual(second, _ObjectIdHelper.increment_id(first, -1))
+  self.assertEqual(first, _ObjectIdHelper.increment_id(second, 1))
+
+for _ in range(100):
+  id = objectid.ObjectId()
+  self.assertLess(id, _ObjectIdHelper.increment_id(id, 1))
+  self.assertGreater(id, _ObjectIdHelper.increment_id(id, -1))
 
 Review comment:
   done.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 291507)
Time Spent: 10h  (was: 9h 50m)

> Python MongoDB IO performance and correctness issues
> 
>
> Key: BEAM-7866
> URL: https://issues.apache.org/jira/browse/BEAM-7866
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Eugene Kirpichov
>Assignee: Yichi Zhang
>Priority: Blocker
> Fix For: 2.15.0
>
>  Time Spent: 10h
>  Remaining Estimate: 0h
>
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py
>  splits the query result by computing number of results in constructor, and 
> then in each reader re-executing the whole query and getting an index 
> sub-range of those results.
> This is broken in several critical ways:
> - The order of query results returned by find() is not necessarily 
> deterministic, so the idea of index ranges on it is meaningless: each shard 
> may basically get random, possibly overlapping subsets of the total results
> - Even if you add order by `_id`, the database may be changing concurrently 
> to reading and splitting. E.g. if the database contained documents with ids 
> 10 20 30 40 50, and this was split into shards 0..2 and 3..5 (under the 
> assumption that these shards would contain respectively 10 20 30, and 40 50), 
> and then suppose shard 10 20 30 is read and then document 25 is inserted - 
> then the 3..5 shard will read 30 40 50, i.e. document 30 is duplicated and 
> document 25 is lost.
> - Every shard re-executes the query and skips the first start_offset items, 
> which in total is quadratic complexity
> - The query is first executed in the constructor in order to count results, 
> which 1) means the constructor can be super slow and 2) it won't work at all 
> if the database is unavailable at the time the pipeline is constructed (e.g. 
> if this is a template).
> Unfortunately, none of these issues are caught by SourceTestUtils: this class 
> has extensive 

[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues

2019-08-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=291506=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291506
 ]

ASF GitHub Bot logged work on BEAM-7866:


Author: ASF GitHub Bot
Created on: 08/Aug/19 18:56
Start Date: 08/Aug/19 18:56
Worklog Time Spent: 10m 
  Work Description: y1chi commented on pull request #9233:  [BEAM-7866] Fix 
python ReadFromMongoDB potential data loss issue
URL: https://github.com/apache/beam/pull/9233#discussion_r312194142
 
 

 ##
 File path: sdks/python/apache_beam/io/mongodbio_test.py
 ##
 @@ -72,72 +176,47 @@ def test_split(self, mock_client):
 
   @mock.patch('apache_beam.io.mongodbio.MongoClient')
   def test_dynamic_work_rebalancing(self, mock_client):
-splits = list(self.mongo_source.split(desired_bundle_size=3000))
-mock_client.return_value.__enter__.return_value.__getitem__.return_value \
-  .__getitem__.return_value.find.return_value = [{'x': 1}, {'x': 2},
- {'x': 3}, {'x': 4},
- {'x': 5}]
+mock_client.return_value = _MockMongoClient(self._docs)
+splits = list(
+self.mongo_source.split(desired_bundle_size=3000 * 1024 * 1024))
 assert len(splits) == 1
 source_test_utils.assert_split_at_fraction_exhaustive(
 splits[0].source, splits[0].start_position, splits[0].stop_position)
 
-  @mock.patch('apache_beam.io.mongodbio.OffsetRangeTracker')
-  def test_get_range_tracker(self, mock_tracker):
-self.mongo_source.get_range_tracker(None, None)
-mock_tracker.assert_called_with(0, 5)
-self.mongo_source.get_range_tracker(10, 20)
-mock_tracker.assert_called_with(10, 20)
+  @mock.patch('apache_beam.io.mongodbio.MongoClient')
+  def test_get_range_tracker(self, mock_client):
+mock_client.return_value = _MockMongoClient(self._docs)
+self.assertIsInstance(self.mongo_source.get_range_tracker(None, None),
+  _ObjectIdRangeTracker)
 
   @mock.patch('apache_beam.io.mongodbio.MongoClient')
   def test_read(self, mock_client):
 mock_tracker = mock.MagicMock()
-mock_tracker.try_claim.return_value = True
-mock_tracker.start_position.return_value = 0
-mock_tracker.stop_position.return_value = 2
+mock_tracker.start_position.return_value = self._ids[0]
 
 Review comment:
   done.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 291506)
Time Spent: 9h 50m  (was: 9h 40m)

> Python MongoDB IO performance and correctness issues
> 
>
> Key: BEAM-7866
> URL: https://issues.apache.org/jira/browse/BEAM-7866
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Eugene Kirpichov
>Assignee: Yichi Zhang
>Priority: Blocker
> Fix For: 2.15.0
>
>  Time Spent: 9h 50m
>  Remaining Estimate: 0h
>
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py
>  splits the query result by computing number of results in constructor, and 
> then in each reader re-executing the whole query and getting an index 
> sub-range of those results.
> This is broken in several critical ways:
> - The order of query results returned by find() is not necessarily 
> deterministic, so the idea of index ranges on it is meaningless: each shard 
> may basically get random, possibly overlapping subsets of the total results
> - Even if you add order by `_id`, the database may be changing concurrently 
> to reading and splitting. E.g. if the database contained documents with ids 
> 10 20 30 40 50, and this was split into shards 0..2 and 3..5 (under the 
> assumption that these shards would contain respectively 10 20 30, and 40 50), 
> and then suppose shard 10 20 30 is read and then document 25 is inserted - 
> then the 3..5 shard will read 30 40 50, i.e. document 30 is duplicated and 
> document 25 is lost.
> - Every shard re-executes the query and skips the first start_offset items, 
> which in total is quadratic complexity
> - The query is first executed in the constructor in order to count results, 
> which 1) means the constructor can be super slow and 2) it won't work at all 
> if the database is unavailable at the time the pipeline is constructed (e.g. 
> if this is a template).
> Unfortunately, none of these issues are caught by SourceTestUtils: this class 
> has extensive coverage with it, and the tests pass. This is 

[jira] [Closed] (BEAM-7675) Unify test suite configuration structure across Py2 and Py 3 suites

2019-08-08 Thread Mark Liu (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mark Liu closed BEAM-7675.
--

> Unify test suite configuration structure across Py2 and Py 3 suites
> ---
>
> Key: BEAM-7675
> URL: https://issues.apache.org/jira/browse/BEAM-7675
> Project: Beam
>  Issue Type: Sub-task
>  Components: testing
>Reporter: Valentyn Tymofieiev
>Assignee: Mark Liu
>Priority: Major
> Fix For: 2.16.0
>
>  Time Spent: 3h
>  Remaining Estimate: 0h
>
> Currently most of Py27 test suites are defined in 
> https://github.com/apache/beam/blob/master/sdks/python/build.gradle while 
> Python 3.x suites are defined in runner-specific folders, for example 
> https://github.com/apache/beam/blob/master/sdks/python/test-suites/direct/py35/build.gradle.
>  
>  
> This may cause confusion (where to add a new suite) and divergence of tests 
> suites, for example Direct runner postcommit suites on Python 2 and Python 3 
> are defined inconsistently https://issues.apache.org/jira/browse/BEAM-7674 
> We should unify test structure to avoid confusion and consider reducing 
> duplication of Gradle code if/when it makes sense to do so.   



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-7678) typehints with_output_types annotation doesn't work for stateful DoFn

2019-08-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7678?focusedWorklogId=291504=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291504
 ]

ASF GitHub Bot logged work on BEAM-7678:


Author: ASF GitHub Bot
Created on: 08/Aug/19 18:45
Start Date: 08/Aug/19 18:45
Worklog Time Spent: 10m 
  Work Description: ecanzonieri commented on issue #9238: [BEAM-7678] Fixes 
bug in output element_type generation in Kv PipelineVisitor
URL: https://github.com/apache/beam/pull/9238#issuecomment-519641717
 
 
   @aaltay do you think we can close the ticket BEAM-7678 now? Feel free to 
assign it me, I don't think I have permission in the Beam Jira to close or 
assign tickets to myself.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 291504)
Time Spent: 2h 10m  (was: 2h)

> typehints with_output_types annotation doesn't work for stateful DoFn 
> --
>
> Key: BEAM-7678
> URL: https://issues.apache.org/jira/browse/BEAM-7678
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Affects Versions: 2.13.0
>Reporter: Enrico Canzonieri
>Priority: Minor
>  Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> The output types typehints seem to be ignored when using a stateful DoFn, but 
> the same typehint works perfectly when used without state. This issue 
> prevents a custom Coder from being used because Beam will default to one of 
> the {{FastCoders}} (I believe Pickle).
> Example code:
> {code}
> @typehints.with_output_types(Message)
> class StatefulDoFn(DoFn):
> COUNTER_STATE = BagStateSpec('counter', VarIntCoder())
> def process(self, element, counter=DoFn.StateParam(COUNTER_STATE)):
>   (key, messages) = element
>   newMessage = Message()
>   return [newMessage]
> {code}
> The example code is just defining a stateful DoFn for python. The used runner 
> is the Flink 1.6.4 portable runner.
> Finally, overriding {{infer_output_type}} to return a 
> {{typehints.List[Message]}} solves the issue.
> Looking at the code, it seems to me that in 
> [https://github.com/apache/beam/blob/v2.13.0/sdks/python/apache_beam/pipeline.py#L643]
>  we do not take the typehints into account.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (BEAM-7932) GcsFileSystem should validate directories and file names for \n chars

2019-08-08 Thread Daniel Mills (JIRA)
Daniel Mills created BEAM-7932:
--

 Summary: GcsFileSystem should validate directories and file names 
for \n chars
 Key: BEAM-7932
 URL: https://issues.apache.org/jira/browse/BEAM-7932
 Project: Beam
  Issue Type: Bug
  Components: io-java-gcp
Reporter: Daniel Mills


Newlines are not valid characters in GCS object names 
([https://cloud.google.com/storage/docs/naming]), but we don't currently 
validate for this.  This leads to confusing errors if users accidentally 
include a trailing newline char on a string being passed into an IO.

The particular place where this came up was with FileIO using AvroIO.sink, 
where the directory passed to to() had a trailing newline, but it looks like 
this will be a problem generally with FileIO + GCS. 



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (BEAM-7933) Adding timeout to JobServer grpc calls

2019-08-08 Thread Enrico Canzonieri (JIRA)
Enrico Canzonieri created BEAM-7933:
---

 Summary: Adding timeout to JobServer grpc calls
 Key: BEAM-7933
 URL: https://issues.apache.org/jira/browse/BEAM-7933
 Project: Beam
  Issue Type: Improvement
  Components: sdk-py-core
Affects Versions: 2.14.0
Reporter: Enrico Canzonieri


grpc calls to the JobServer from the Python SDK do not have timeouts. That 
means that the call to pipeline.run()could hang forever if the JobServer is not 
running (or failing to start).
E.g. 
[https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/portability/portable_runner.py#L307]
 the call to Prepare() doesn't provide any timeout value and the same applies 
to other JobServer requests.
As part of this ticket we could add a default timeout of 60 seconds as the 
default timeout for http client.
Additionally, we could consider adding a --job-server-request-timeout to the 
[PortableOptions|https://github.com/apache/beam/blob/master/sdks/python/apache_beam/options/pipeline_options.py#L805]
 class to be used in the JobServer interactions inside probable_runner.py.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-7896) Rate estimation for Kafka Table

2019-08-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7896?focusedWorklogId=291498=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291498
 ]

ASF GitHub Bot logged work on BEAM-7896:


Author: ASF GitHub Bot
Created on: 08/Aug/19 18:37
Start Date: 08/Aug/19 18:37
Worklog Time Spent: 10m 
  Work Description: akedin commented on issue #9298: [BEAM-7896] 
Implementing RateEstimation for KafkaTable 
URL: https://github.com/apache/beam/pull/9298#issuecomment-519638774
 
 
   run sql postcommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 291498)
Time Spent: 0.5h  (was: 20m)

> Rate estimation for Kafka Table
> ---
>
> Key: BEAM-7896
> URL: https://issues.apache.org/jira/browse/BEAM-7896
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Alireza Samadianzakaria
>Assignee: Alireza Samadianzakaria
>Priority: Major
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> Currently, KafkaTable returns UNKNOWN statistics for its rate. 
> We can use previously arrived tuples to estimate the rate and return correct 
> statistics (See 
> [https://docs.google.com/document/d/1vi1PBBu5IqSy-qZl1Gk-49CcANOpbNs1UAud6LnOaiY|https://docs.google.com/document/d/1vi1PBBu5IqSy-qZl1Gk-49CcANOpbNs1UAud6LnOaiY/])
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-6907) Standardize Gradle projects/tasks structure for Python SDK

2019-08-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6907?focusedWorklogId=291496=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291496
 ]

ASF GitHub Bot logged work on BEAM-6907:


Author: ASF GitHub Bot
Created on: 08/Aug/19 18:36
Start Date: 08/Aug/19 18:36
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on issue #9277: [BEAM-6907] Reuse 
Python tarball in tox & dataflow integration tests
URL: https://github.com/apache/beam/pull/9277#issuecomment-519638427
 
 
   Run Java PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 291496)
Time Spent: 4h 10m  (was: 4h)

> Standardize Gradle projects/tasks structure for Python SDK
> --
>
> Key: BEAM-6907
> URL: https://issues.apache.org/jira/browse/BEAM-6907
> Project: Beam
>  Issue Type: Task
>  Components: build-system
>Reporter: Mark Liu
>Assignee: Mark Liu
>Priority: Major
>  Time Spent: 4h 10m
>  Remaining Estimate: 0h
>
> As Gradle parallelism applied to Python tests and more python versions added 
> to tests, the way Gradle manages projects/tasks changed a lot. Frictions are 
> generated during Gradle refactor since some projects defined separate build 
> script under source directory. Thus, It will be better to standardize how we 
> use Gradle. This will help to manage Python tests/builds/tasks across 
> different versions and runners, and also easy for people to learn/use/develop.
> In general, we may want to:
> - Apply parallel execution
> - Share common tasks
> - Centralize test related tasks
> - Have a clear Gradle structure for projects/tasks



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-6907) Standardize Gradle projects/tasks structure for Python SDK

2019-08-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6907?focusedWorklogId=291497=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291497
 ]

ASF GitHub Bot logged work on BEAM-6907:


Author: ASF GitHub Bot
Created on: 08/Aug/19 18:36
Start Date: 08/Aug/19 18:36
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on issue #9277: [BEAM-6907] Reuse 
Python tarball in tox & dataflow integration tests
URL: https://github.com/apache/beam/pull/9277#issuecomment-519638476
 
 
   Run Python 2 PostCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 291497)
Time Spent: 4h 20m  (was: 4h 10m)

> Standardize Gradle projects/tasks structure for Python SDK
> --
>
> Key: BEAM-6907
> URL: https://issues.apache.org/jira/browse/BEAM-6907
> Project: Beam
>  Issue Type: Task
>  Components: build-system
>Reporter: Mark Liu
>Assignee: Mark Liu
>Priority: Major
>  Time Spent: 4h 20m
>  Remaining Estimate: 0h
>
> As Gradle parallelism applied to Python tests and more python versions added 
> to tests, the way Gradle manages projects/tasks changed a lot. Frictions are 
> generated during Gradle refactor since some projects defined separate build 
> script under source directory. Thus, It will be better to standardize how we 
> use Gradle. This will help to manage Python tests/builds/tasks across 
> different versions and runners, and also easy for people to learn/use/develop.
> In general, we may want to:
> - Apply parallel execution
> - Share common tasks
> - Centralize test related tasks
> - Have a clear Gradle structure for projects/tasks



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (BEAM-7861) Make it easy to change multi-process and multi-thread mode for Python Direct runners

2019-08-08 Thread Hannah Jiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hannah Jiang updated BEAM-7861:
---
Status: Open  (was: Triage Needed)

> Make it easy to change multi-process and multi-thread mode for Python Direct 
> runners
> 
>
> Key: BEAM-7861
> URL: https://issues.apache.org/jira/browse/BEAM-7861
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Hannah Jiang
>Assignee: Hannah Jiang
>Priority: Major
>
> BEAM-3645 makes it possible to run a map task parallel.
> However, users need to change runner when switch between multithreading and 
> multiprocessing mode.
> We want to add a flag (ex: --use-multiprocess) to make the switch easy 
> without changing the runner each time.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (BEAM-7861) Make it easy to change between multi-process and multi-thread mode for Python Direct runners

2019-08-08 Thread Hannah Jiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hannah Jiang updated BEAM-7861:
---
Summary: Make it easy to change between multi-process and multi-thread mode 
for Python Direct runners  (was: Make it easy to change multi-process and 
multi-thread mode for Python Direct runners)

> Make it easy to change between multi-process and multi-thread mode for Python 
> Direct runners
> 
>
> Key: BEAM-7861
> URL: https://issues.apache.org/jira/browse/BEAM-7861
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Hannah Jiang
>Assignee: Hannah Jiang
>Priority: Major
>
> BEAM-3645 makes it possible to run a map task parallel.
> However, users need to change runner when switch between multithreading and 
> multiprocessing mode.
> We want to add a flag (ex: --use-multiprocess) to make the switch easy 
> without changing the runner each time.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (BEAM-7888) Test Multi Process Direct Runner With Largish Data

2019-08-08 Thread Hannah Jiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7888?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hannah Jiang updated BEAM-7888:
---
Status: Open  (was: Triage Needed)

> Test Multi Process Direct Runner With Largish Data
> --
>
> Key: BEAM-7888
> URL: https://issues.apache.org/jira/browse/BEAM-7888
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Ahmet Altay
>Assignee: Hannah Jiang
>Priority: Major
>
> Filing this as a tracker.
> We can test multiprocess runner with a largish amount of data to the extend 
> that we can do this on Jenkins. This will serve 2 purposes:
> - Find out issues related to multi processing. It would be easier to find 
> rare issues when running over non-trivial data.
> - Serve as a baseline (if not a benchmark) to understand the limits of the 
> multiprocess runner.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (BEAM-7861) Make it easy to change multi-process and multi-thread mode for Python Direct runners

2019-08-08 Thread Hannah Jiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hannah Jiang updated BEAM-7861:
---
Issue Type: Sub-task  (was: Improvement)
Parent: BEAM-3645

> Make it easy to change multi-process and multi-thread mode for Python Direct 
> runners
> 
>
> Key: BEAM-7861
> URL: https://issues.apache.org/jira/browse/BEAM-7861
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Hannah Jiang
>Assignee: Hannah Jiang
>Priority: Major
>
> BEAM-3645 makes it possible to run a map task parallel.
> However, users need to change runner when switch between multithreading and 
> multiprocessing mode.
> We want to add a flag (ex: --use-multiprocess) to make the switch easy 
> without changing the runner each time.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (BEAM-7888) Test Multi Process Direct Runner With Largish Data

2019-08-08 Thread Hannah Jiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7888?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hannah Jiang updated BEAM-7888:
---
Issue Type: Sub-task  (was: Bug)
Parent: BEAM-3645

> Test Multi Process Direct Runner With Largish Data
> --
>
> Key: BEAM-7888
> URL: https://issues.apache.org/jira/browse/BEAM-7888
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Ahmet Altay
>Assignee: Hannah Jiang
>Priority: Major
>
> Filing this as a tracker.
> We can test multiprocess runner with a largish amount of data to the extend 
> that we can do this on Jenkins. This will serve 2 purposes:
> - Find out issues related to multi processing. It would be easier to find 
> rare issues when running over non-trivial data.
> - Serve as a baseline (if not a benchmark) to understand the limits of the 
> multiprocess runner.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-6907) Standardize Gradle projects/tasks structure for Python SDK

2019-08-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6907?focusedWorklogId=291494=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291494
 ]

ASF GitHub Bot logged work on BEAM-6907:


Author: ASF GitHub Bot
Created on: 08/Aug/19 18:26
Start Date: 08/Aug/19 18:26
Worklog Time Spent: 10m 
  Work Description: markflyhigh commented on issue #9277: [BEAM-6907] Reuse 
Python tarball in tox & dataflow integration tests
URL: https://github.com/apache/beam/pull/9277#issuecomment-519634685
 
 
   Rerun [Python37_PostCommit 
#17](https://builds.apache.org/job/beam_PostCommit_Python37_PR/17/) and 
[Python_PreCommit 
#738](https://builds.apache.org/job/beam_PreCommit_Python_Phrase/738/), and 
they all passed.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 291494)
Time Spent: 4h  (was: 3h 50m)

> Standardize Gradle projects/tasks structure for Python SDK
> --
>
> Key: BEAM-6907
> URL: https://issues.apache.org/jira/browse/BEAM-6907
> Project: Beam
>  Issue Type: Task
>  Components: build-system
>Reporter: Mark Liu
>Assignee: Mark Liu
>Priority: Major
>  Time Spent: 4h
>  Remaining Estimate: 0h
>
> As Gradle parallelism applied to Python tests and more python versions added 
> to tests, the way Gradle manages projects/tasks changed a lot. Frictions are 
> generated during Gradle refactor since some projects defined separate build 
> script under source directory. Thus, It will be better to standardize how we 
> use Gradle. This will help to manage Python tests/builds/tasks across 
> different versions and runners, and also easy for people to learn/use/develop.
> In general, we may want to:
> - Apply parallel execution
> - Share common tasks
> - Centralize test related tasks
> - Have a clear Gradle structure for projects/tasks



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-6907) Standardize Gradle projects/tasks structure for Python SDK

2019-08-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6907?focusedWorklogId=291493=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291493
 ]

ASF GitHub Bot logged work on BEAM-6907:


Author: ASF GitHub Bot
Created on: 08/Aug/19 18:25
Start Date: 08/Aug/19 18:25
Worklog Time Spent: 10m 
  Work Description: markflyhigh commented on issue #9277: [BEAM-6907] Reuse 
Python tarball in tox & dataflow integration tests
URL: https://github.com/apache/beam/pull/9277#issuecomment-519634685
 
 
   Rerun 
[Python37_PostCommit](https://builds.apache.org/job/beam_PostCommit_Python37_PR/17/)
 and 
[Python_PreCommit](https://builds.apache.org/job/beam_PreCommit_Python_Phrase/738/),
 and they all passed.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 291493)
Time Spent: 3h 50m  (was: 3h 40m)

> Standardize Gradle projects/tasks structure for Python SDK
> --
>
> Key: BEAM-6907
> URL: https://issues.apache.org/jira/browse/BEAM-6907
> Project: Beam
>  Issue Type: Task
>  Components: build-system
>Reporter: Mark Liu
>Assignee: Mark Liu
>Priority: Major
>  Time Spent: 3h 50m
>  Remaining Estimate: 0h
>
> As Gradle parallelism applied to Python tests and more python versions added 
> to tests, the way Gradle manages projects/tasks changed a lot. Frictions are 
> generated during Gradle refactor since some projects defined separate build 
> script under source directory. Thus, It will be better to standardize how we 
> use Gradle. This will help to manage Python tests/builds/tasks across 
> different versions and runners, and also easy for people to learn/use/develop.
> In general, we may want to:
> - Apply parallel execution
> - Share common tasks
> - Centralize test related tasks
> - Have a clear Gradle structure for projects/tasks



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-7389) Colab examples for element-wise transforms (Python)

2019-08-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7389?focusedWorklogId=291489=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291489
 ]

ASF GitHub Bot logged work on BEAM-7389:


Author: ASF GitHub Bot
Created on: 08/Aug/19 18:22
Start Date: 08/Aug/19 18:22
Worklog Time Spent: 10m 
  Work Description: rosetn commented on pull request #9261: [BEAM-7389] Add 
code examples for Partition page
URL: https://github.com/apache/beam/pull/9261#discussion_r312179194
 
 

 ##
 File path: 
website/src/documentation/transforms/python/element-wise/partition.md
 ##
 @@ -39,12 +46,110 @@ You cannot determine the number of partitions in 
mid-pipeline
 See more information in the [Beam Programming Guide]({{ site.baseurl 
}}/documentation/programming-guide/#partition).
 
 ## Examples
-See [BEAM-7389](https://issues.apache.org/jira/browse/BEAM-7389) for updates. 
 
-## Related transforms 
-* [Filter]({{ site.baseurl 
}}/documentation/transforms/python/elementwise/filter) is useful if the 
function is just 
+In the following examples, we create a pipeline with a `PCollection` of 
produce their icon, name, and duration.
+Then, we apply `Partition` in multiple ways to split the `PCollection` into 
multiple `PCollections`.
+
+`Partition` accepts a function that receives the number of partitions,
+and returns the index of the desired partition for the element.
+The number of partitions passed must be a positive integer,
+and it must return an integer in the range `0` to `num_partitions-1`.
+
+### Example 1: Partition with a function
+
+In the following example, we have a known list of durations.
+We partition the `PCollection` into one `PCollection` for every duration type.
+
+```py
+{% github_sample 
/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/element_wise/partition.py
 tag:partition_function %}```
+
+Output `PCollection`s:
+
+```
+{% github_sample 
/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/element_wise/partition_test.py
 tag:partitions %}```
+
+
+  
+https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/element_wise/partition.py;>
+  https://www.tensorflow.org/images/GitHub-Mark-32px.png;
+width="20px" height="20px" alt="View on GitHub" />
+  View on GitHub
+
+  
+
+
+
+### Example 2: Partition with a lambda function
+
+We can also use lambda functions to simplify **Example 1**.
+
+```py
+{% github_sample 
/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/element_wise/partition.py
 tag:partition_lambda %}```
+
+Output `PCollection`s:
+
+```
+{% github_sample 
/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/element_wise/partition_test.py
 tag:partitions %}```
+
+
+  
+https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/element_wise/partition.py;>
+  https://www.tensorflow.org/images/GitHub-Mark-32px.png;
+width="20px" height="20px" alt="View on GitHub" />
+  View on GitHub
+
+  
+
+
+
+### Example 3: Partition with multiple arguments
+
+You can pass functions with multiple arguments to `Partition`.
+They are passed as additional positional arguments or keyword arguments to the 
function.
+
+In this example, `split_dataset` takes `plant`, `num_partitions`, and `ratio` 
as arguments.
+`num_partitions` is used by `Partitions` as a positional argument,
+while any other argument will be passed to `split_dataset`.
 
 Review comment:
   Can you add some more explanation by using the concrete details? For 
example, what does sample will do with the arguments in 
beam.Partition(split_dataset, 2, ratio=[8, 2]) and how that affects the output. 
   
   "test_dataset" and "train_dataset" might make sense to some, but isn't 
generic enough. Either rename them to something like "dataset1" and "dataset2" 
or in your explanation before the code.
   
   Something along the lines of "In this example, we want to split the dataset 
into a training dataset and a testing dataset using `Partition`. " And 
elaborate on what setting the ration to [8, 2] does.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 291489)
Time Spent: 38h 50m  (was: 38h 40m)

> Colab examples for element-wise transforms (Python)
> ---
>
> Key: BEAM-7389
> URL: https://issues.apache.org/jira/browse/BEAM-7389
> Project: Beam
>  Issue Type: Improvement
> 

[jira] [Resolved] (BEAM-7836) Provide a way to exclude unit / integration tests with incompatible syntax from running under particular version of Python.

2019-08-08 Thread Udi Meiri (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7836?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Udi Meiri resolved BEAM-7836.
-
   Resolution: Fixed
Fix Version/s: Not applicable

> Provide a way to exclude unit / integration tests with incompatible syntax 
> from running under particular version of Python.
> ---
>
> Key: BEAM-7836
> URL: https://issues.apache.org/jira/browse/BEAM-7836
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core, testing
>Reporter: Valentyn Tymofieiev
>Assignee: Udi Meiri
>Priority: Major
> Fix For: Not applicable
>
>
> Beam currently runs Python tests in 4 version of Python: 2.7, 3.5, 3.6, 3.7. 
> Some Python 3 tests may include code that is considered incorrect syntax in 
> older versions of Python, and such test may break test suites that they are 
> not intended for, see an example: [1] 
> We use `exec` as work around in (very few) tests , example: [2], but it is 
> less convenient to work with such code, for example it is harder to debug or 
> work with in IDE. 
> We should find best practices to deal with this problem and introduce a 
> solution to the Beam .  
> cc: [~markflyhigh], [~udim],  [~yoshiki.obata] 
> [1] https://github.com/apache/beam/pull/8505#issuecomment-498441270
> [2] 
> https://github.com/apache/beam/blob/6cf3b1133658e963b6cc23f780480c3359e79ad6/sdks/python/apache_beam/internal/pickler_test.py#L103
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (BEAM-7836) Provide a way to exclude unit / integration tests with incompatible syntax from running under particular version of Python.

2019-08-08 Thread Udi Meiri (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-7836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16903209#comment-16903209
 ] 

Udi Meiri commented on BEAM-7836:
-

The spotless and other Gradle task breakages were fixed as a part of 
https://issues.apache.org/jira/browse/BEAM-7892
Closing.

> Provide a way to exclude unit / integration tests with incompatible syntax 
> from running under particular version of Python.
> ---
>
> Key: BEAM-7836
> URL: https://issues.apache.org/jira/browse/BEAM-7836
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core, testing
>Reporter: Valentyn Tymofieiev
>Assignee: Udi Meiri
>Priority: Major
>
> Beam currently runs Python tests in 4 version of Python: 2.7, 3.5, 3.6, 3.7. 
> Some Python 3 tests may include code that is considered incorrect syntax in 
> older versions of Python, and such test may break test suites that they are 
> not intended for, see an example: [1] 
> We use `exec` as work around in (very few) tests , example: [2], but it is 
> less convenient to work with such code, for example it is harder to debug or 
> work with in IDE. 
> We should find best practices to deal with this problem and introduce a 
> solution to the Beam .  
> cc: [~markflyhigh], [~udim],  [~yoshiki.obata] 
> [1] https://github.com/apache/beam/pull/8505#issuecomment-498441270
> [2] 
> https://github.com/apache/beam/blob/6cf3b1133658e963b6cc23f780480c3359e79ad6/sdks/python/apache_beam/internal/pickler_test.py#L103
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (BEAM-7523) Add a server for KafkaTable Integration Test

2019-08-08 Thread Alireza Samadianzakaria (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alireza Samadianzakaria updated BEAM-7523:
--
Description: 
We need a server for Kafka Integration test and after setting it up we can 
include KafkaCSVTableIT.java in the integration tests.

Also we can add more tests to KafkaCSVTableIT.java

  was:We need integration test for KafkaTable and KafkaTableProvider in SQL 
module to create and run SQ with KafkaTable as its source.

Summary: Add a server for KafkaTable Integration Test  (was: Add 
Integration Test for KafkaTable and KafkaTableProvider)

> Add a server for KafkaTable Integration Test
> 
>
> Key: BEAM-7523
> URL: https://issues.apache.org/jira/browse/BEAM-7523
> Project: Beam
>  Issue Type: Test
>  Components: dsl-sql
>Reporter: Alireza Samadianzakaria
>Priority: Minor
>
> We need a server for Kafka Integration test and after setting it up we can 
> include KafkaCSVTableIT.java in the integration tests.
> Also we can add more tests to KafkaCSVTableIT.java



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-7896) Rate estimation for Kafka Table

2019-08-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7896?focusedWorklogId=291482=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291482
 ]

ASF GitHub Bot logged work on BEAM-7896:


Author: ASF GitHub Bot
Created on: 08/Aug/19 18:11
Start Date: 08/Aug/19 18:11
Worklog Time Spent: 10m 
  Work Description: riazela commented on pull request #9298: [BEAM-7896] 
Implementing RateEstimation for KafkaTable 
URL: https://github.com/apache/beam/pull/9298
 
 
   This implements the rate estimation for KafkaTable. It also has unit test 
and integration tests for it. 
   
   Since we don't have a Kafka Server, the integration test is excluded from 
the gradle file. Once we have deployed a Kafka server for testing, we can 
include its address and topic name and then include the integration test.
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build
 

[jira] [Work logged] (BEAM-7896) Rate estimation for Kafka Table

2019-08-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7896?focusedWorklogId=291483=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291483
 ]

ASF GitHub Bot logged work on BEAM-7896:


Author: ASF GitHub Bot
Created on: 08/Aug/19 18:11
Start Date: 08/Aug/19 18:11
Worklog Time Spent: 10m 
  Work Description: riazela commented on issue #9298: [BEAM-7896] 
Implementing RateEstimation for KafkaTable 
URL: https://github.com/apache/beam/pull/9298#issuecomment-519629374
 
 
   R: @akedin 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 291483)
Time Spent: 20m  (was: 10m)

> Rate estimation for Kafka Table
> ---
>
> Key: BEAM-7896
> URL: https://issues.apache.org/jira/browse/BEAM-7896
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Alireza Samadianzakaria
>Assignee: Alireza Samadianzakaria
>Priority: Major
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Currently, KafkaTable returns UNKNOWN statistics for its rate. 
> We can use previously arrived tuples to estimate the rate and return correct 
> statistics (See 
> [https://docs.google.com/document/d/1vi1PBBu5IqSy-qZl1Gk-49CcANOpbNs1UAud6LnOaiY|https://docs.google.com/document/d/1vi1PBBu5IqSy-qZl1Gk-49CcANOpbNs1UAud6LnOaiY/])
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Assigned] (BEAM-7896) Rate estimation for Kafka Table

2019-08-08 Thread Alireza Samadianzakaria (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alireza Samadianzakaria reassigned BEAM-7896:
-

Assignee: Alireza Samadianzakaria

> Rate estimation for Kafka Table
> ---
>
> Key: BEAM-7896
> URL: https://issues.apache.org/jira/browse/BEAM-7896
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Alireza Samadianzakaria
>Assignee: Alireza Samadianzakaria
>Priority: Major
>
> Currently, KafkaTable returns UNKNOWN statistics for its rate. 
> We can use previously arrived tuples to estimate the rate and return correct 
> statistics (See 
> [https://docs.google.com/document/d/1vi1PBBu5IqSy-qZl1Gk-49CcANOpbNs1UAud6LnOaiY|https://docs.google.com/document/d/1vi1PBBu5IqSy-qZl1Gk-49CcANOpbNs1UAud6LnOaiY/])
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Resolved] (BEAM-7898) Remove Default Implementation of getStatistics for Tables

2019-08-08 Thread Alireza Samadianzakaria (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7898?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alireza Samadianzakaria resolved BEAM-7898.
---
   Resolution: Fixed
Fix Version/s: Not applicable

> Remove Default Implementation of getStatistics for Tables
> -
>
> Key: BEAM-7898
> URL: https://issues.apache.org/jira/browse/BEAM-7898
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Alireza Samadianzakaria
>Assignee: Alireza Samadianzakaria
>Priority: Major
> Fix For: Not applicable
>
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> There is a default implementation for getRowCount. This may cause problems in 
> the future because people may forget to implement it for the new tables. The 
> default just returns UNKNOWN we add this implementation to all the Tables 
> that have not implemented get stats.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Resolved] (BEAM-7777) Stream Cost Model and RelNode Cost Estimation

2019-08-08 Thread Alireza Samadianzakaria (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alireza Samadianzakaria resolved BEAM-.
---
   Resolution: Implemented
Fix Version/s: Not applicable

> Stream Cost Model and RelNode Cost Estimation
> -
>
> Key: BEAM-
> URL: https://issues.apache.org/jira/browse/BEAM-
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Alireza Samadianzakaria
>Assignee: Alireza Samadianzakaria
>Priority: Major
> Fix For: Not applicable
>
>  Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> Currently the cost model is not suitable for streaming jobs. (it uses row 
> count and for estimating the output row count of each node it is using 
> calcite estimations that are only applicable to bounded data.)
> We need to implement a new cost model and implement cost estimation for all 
> of our physical nodes. 



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (BEAM-7896) Rate estimation for Kafka Table

2019-08-08 Thread Alireza Samadianzakaria (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alireza Samadianzakaria updated BEAM-7896:
--
Status: Open  (was: Triage Needed)

> Rate estimation for Kafka Table
> ---
>
> Key: BEAM-7896
> URL: https://issues.apache.org/jira/browse/BEAM-7896
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Alireza Samadianzakaria
>Assignee: Alireza Samadianzakaria
>Priority: Major
>
> Currently, KafkaTable returns UNKNOWN statistics for its rate. 
> We can use previously arrived tuples to estimate the rate and return correct 
> statistics (See 
> [https://docs.google.com/document/d/1vi1PBBu5IqSy-qZl1Gk-49CcANOpbNs1UAud6LnOaiY|https://docs.google.com/document/d/1vi1PBBu5IqSy-qZl1Gk-49CcANOpbNs1UAud6LnOaiY/])
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Reopened] (BEAM-7608) v1new ReadFromDatastore skips entities

2019-08-08 Thread Udi Meiri (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Udi Meiri reopened BEAM-7608:
-

reopening to get included in 2.15

> v1new ReadFromDatastore skips entities
> --
>
> Key: BEAM-7608
> URL: https://issues.apache.org/jira/browse/BEAM-7608
> Project: Beam
>  Issue Type: Bug
>  Components: io-py-gcp
>Affects Versions: 2.13.0
> Environment: MacOS 10.14.5, Python 2.7
>Reporter: Jacob Gur
>Assignee: Udi Meiri
>Priority: Critical
> Fix For: 2.15.0
>
>  Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> A simple map over a datastore kind in local emulator using the new 
> v1new.datastoreio.ReadFromDatastore skip entities.
> The kind has 1516 entities, and when I map over it using the old 
> ReadFromDatastore transform, it maps all of them, i.e., I can map to id and 
> write to text file.
> But the new transform only maps 365 entities. There is no error. The tail of 
> the standard output is:
> {code:java}
> INFO:root:Latest stats timestamp for kind face_apilog is 2019-06-18 
> 08:15:21+00:00
>  INFO:root:Estimated size bytes for query: 116188
>  INFO:root:Splitting the query into 12 splits
>  INFO:root:Running 
> (((GetEntities/Reshuffle/ReshufflePerKey/GroupByKey/Read)(ref_AppliedPTransform_GetEntities/Reshuffle/ReshufflePerKey/FlatMap(restore_timestamps)_14))((ref_AppliedPTransform_GetEntities/Reshuffle/RemoveRandomKeys_15)(ref_AppliedPTransform_GetEntities/Read_16)))((ref_AppliedPTransform_MapToId_17)((ref_AppliedPTransform_WriteToFile/Write/WriteImpl/WriteBundles_24)((ref_AppliedPTransform_WriteToFile/Write/WriteImpl/Pair_25)((ref_AppliedPTransform_WriteToFile/Write/WriteImpl/WindowInto(WindowIntoFn)_26)(WriteToFile/Write/WriteImpl/GroupByKey/Write)
>  INFO:root:Running 
> (WriteToFile/Write/WriteImpl/GroupByKey/Read)((ref_AppliedPTransform_WriteToFile/Write/WriteImpl/Extract_31)(ref_PCollection_PCollection_20/Write))
>  INFO:root:Running 
> (ref_PCollection_PCollection_12/Read)((ref_AppliedPTransform_WriteToFile/Write/WriteImpl/PreFinalize_32)(ref_PCollection_PCollection_21/Write))
>  INFO:root:Running 
> (ref_PCollection_PCollection_12/Read)+(ref_AppliedPTransform_WriteToFile/Write/WriteImpl/FinalizeWrite_33)
>  INFO:root:Starting finalize_write threads with num_shards: 1 (skipped: 0), 
> batches: 1, num_threads: 1
>  INFO:root:Renamed 1 shards in 0.12 seconds.{code}
>  
> The code for the job on the new transform is:
>  
>  
> {code:java}
> from __future__ import absolute_import
> import logging
> import os
> import sys
> import apache_beam as beam
> from apache_beam.io.gcp.datastore.v1new.datastoreio import ReadFromDatastore
> from apache_beam.io.gcp.datastore.v1new.types import Query
> # TODO: should be set outside of python process
> os.environ['DATASTORE_EMULATOR_HOST'] = 'localhost:8085'
> def map_to_id(element):
>  face_log_id = element.to_client_entity().id
>  return face_log_id
> def run(argv=None):
>  p = beam.Pipeline(argv=argv)
>  project = 'dev'
>  (p
>  | 'GetEntities' >> ReadFromDatastore(Query(kind='face_apilog', 
> project=project))
>  | 'MapToId' >> beam.Map(map_to_id)
>  | 'WriteToFile' >> beam.io.WriteToText('result')
>  )
>  p.run().wait_until_finish()
> if __name__ == '__main__':
>  logging.getLogger().setLevel(logging.INFO)
>  run(sys.argv){code}
>  
> For comparison, the code for the job on the old transform is:
>  
> {code:java}
> from __future__ import absolute_import
> import logging
> import os
> import sys
> import apache_beam as beam
> from apache_beam.io.gcp.datastore.v1.datastoreio import ReadFromDatastore
> from google.cloud.proto.datastore.v1 import query_pb2
> # TODO: should be set outside of python process
> os.environ['DATASTORE_EMULATOR_HOST'] = 'localhost:8085'
> def map_to_id(element):
>  face_log_id = element.key.path[-1].id
>  return face_log_id
> def run(argv=None):
>  p = beam.Pipeline(argv=argv)
>  project = 'dev'
>  query = query_pb2.Query()
>  query.kind.add().name = 'face_apilog'
>  (p
>  | 'GetEntities' >> ReadFromDatastore(project=project, query=query)
>  # TODO: ParDo???
>  | 'MapToId' >> beam.Map(map_to_id)
>  | 'WriteToFile' >> beam.io.WriteToText('result')
>  )
>  p.run().wait_until_finish()
> if __name__ == '__main__':
>  logging.getLogger().setLevel(logging.INFO)
>  run(sys.argv){code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Reopened] (BEAM-7828) Add key type conversion in from and to client entity in Datastore v1new IO.

2019-08-08 Thread Udi Meiri (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7828?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Udi Meiri reopened BEAM-7828:
-

Reopening to get cherrypicked on 2.15

> Add key type conversion in from and to client entity in Datastore v1new IO.
> ---
>
> Key: BEAM-7828
> URL: https://issues.apache.org/jira/browse/BEAM-7828
> Project: Beam
>  Issue Type: Sub-task
>  Components: io-py-gcp
>Reporter: Valentyn Tymofieiev
>Assignee: Udi Meiri
>Priority: Major
> Fix For: 2.16.0
>
>  Time Spent: 1h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (BEAM-7828) Add key type conversion in from and to client entity in Datastore v1new IO.

2019-08-08 Thread Udi Meiri (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7828?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Udi Meiri updated BEAM-7828:

Fix Version/s: (was: 2.16.0)
   2.15.0

> Add key type conversion in from and to client entity in Datastore v1new IO.
> ---
>
> Key: BEAM-7828
> URL: https://issues.apache.org/jira/browse/BEAM-7828
> Project: Beam
>  Issue Type: Sub-task
>  Components: io-py-gcp
>Reporter: Valentyn Tymofieiev
>Assignee: Udi Meiri
>Priority: Major
> Fix For: 2.15.0
>
>  Time Spent: 1h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-7389) Colab examples for element-wise transforms (Python)

2019-08-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7389?focusedWorklogId=291478=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291478
 ]

ASF GitHub Bot logged work on BEAM-7389:


Author: ASF GitHub Bot
Created on: 08/Aug/19 18:02
Start Date: 08/Aug/19 18:02
Worklog Time Spent: 10m 
  Work Description: rosetn commented on pull request #9258: [BEAM-7389] Add 
code examples for KvSwap page
URL: https://github.com/apache/beam/pull/9258#discussion_r312169150
 
 

 ##
 File path: website/src/documentation/transforms/python/element-wise/kvswap.md
 ##
 @@ -19,21 +19,64 @@ limitations under the License.
 -->
 
 # Kvswap
-
-
+localStorage.setItem('language', 'language-py')
+
+
+
+  
+https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.KvSwap;>
-  https://beam.apache.org/images/logos/sdks/python.png; 
width="20px" height="20px"
-   alt="Pydoc" />
- Pydoc
+  https://beam.apache.org/images/logos/sdks/python.png;
+  width="20px" height="20px" alt="Pydoc" />
+  Pydoc
 
+  
 
 
-Takes a collection of key-value pairs and returns a collection of key-value 
pairs 
+
+Takes a collection of key-value pairs and returns a collection of key-value 
pairs
 which has each key and value swapped.
 
-## Examples
-See [BEAM-7389](https://issues.apache.org/jira/browse/BEAM-7389) for updates. 
+## Examplse
 
 Review comment:
   Typo "Examples"
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 291478)
Time Spent: 38.5h  (was: 38h 20m)

> Colab examples for element-wise transforms (Python)
> ---
>
> Key: BEAM-7389
> URL: https://issues.apache.org/jira/browse/BEAM-7389
> Project: Beam
>  Issue Type: Improvement
>  Components: website
>Reporter: Rose Nguyen
>Assignee: David Cavazos
>Priority: Minor
>  Time Spent: 38.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-7389) Colab examples for element-wise transforms (Python)

2019-08-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7389?focusedWorklogId=291479=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291479
 ]

ASF GitHub Bot logged work on BEAM-7389:


Author: ASF GitHub Bot
Created on: 08/Aug/19 18:02
Start Date: 08/Aug/19 18:02
Worklog Time Spent: 10m 
  Work Description: rosetn commented on pull request #9258: [BEAM-7389] Add 
code examples for KvSwap page
URL: https://github.com/apache/beam/pull/9258#discussion_r312170398
 
 

 ##
 File path: website/src/documentation/transforms/python/element-wise/kvswap.md
 ##
 @@ -19,21 +19,64 @@ limitations under the License.
 -->
 
 # Kvswap
-
-
+localStorage.setItem('language', 'language-py')
+
+
+
+  
+https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.KvSwap;>
-  https://beam.apache.org/images/logos/sdks/python.png; 
width="20px" height="20px"
-   alt="Pydoc" />
- Pydoc
+  https://beam.apache.org/images/logos/sdks/python.png;
+  width="20px" height="20px" alt="Pydoc" />
+  Pydoc
 
+  
 
 
-Takes a collection of key-value pairs and returns a collection of key-value 
pairs 
+
+Takes a collection of key-value pairs and returns a collection of key-value 
pairs
 which has each key and value swapped.
 
-## Examples
-See [BEAM-7389](https://issues.apache.org/jira/browse/BEAM-7389) for updates. 
+## Examplse
+
+In the following example, we create a pipeline with a `PCollection` of 
key-value pairs.
+Then, we apply `Keys` to extract the keys and discarding the values.
 
 Review comment:
   "Then, we apply `KvSwap` to swap the keys and values."
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 291479)
Time Spent: 38h 40m  (was: 38.5h)

> Colab examples for element-wise transforms (Python)
> ---
>
> Key: BEAM-7389
> URL: https://issues.apache.org/jira/browse/BEAM-7389
> Project: Beam
>  Issue Type: Improvement
>  Components: website
>Reporter: Rose Nguyen
>Assignee: David Cavazos
>Priority: Minor
>  Time Spent: 38h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Comment Edited] (BEAM-7929) ParquetTable.buildIOReader should support column projection and filter predicate

2019-08-08 Thread Rui Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-7929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16903192#comment-16903192
 ] 

Rui Wang edited comment on BEAM-7929 at 8/8/19 6:01 PM:


Calcite has well support on filter/projection pushdown so we don't need to 
analyzing Calcite plan and Calcite will help us do that by optimization rules 
and calling convention.

For example, BeamSQL has adopted some rules: 
https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java#L73


I can find many useful links on this topic, like:

http://mail-archives.apache.org/mod_mbox/calcite-dev/201610.mbox/%3c96745466-0bfb-49b4-8396-70a992cae...@apache.org%3E
https://stackoverflow.com/questions/40217160/how-to-push-down-project-filter-aggregation-to-tablescan-in-calcite


There is a also a good slides to cover many piece of calcite: 
https://www.slideshare.net/JordanHalterman/introduction-to-apache-calcite


was (Author: amaliujia):
Calcite has well support on filter/pushdown so we don't need to analyzing 
Calcite plan and Calcite will help us do that by optimization rules and calling 
convention.

For example, BeamSQL has adopted some rules: 
https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java#L73


I can find many useful links on this topic, like:

http://mail-archives.apache.org/mod_mbox/calcite-dev/201610.mbox/%3c96745466-0bfb-49b4-8396-70a992cae...@apache.org%3E
https://stackoverflow.com/questions/40217160/how-to-push-down-project-filter-aggregation-to-tablescan-in-calcite


There is a also a good slides to cover many piece of calcite: 
https://www.slideshare.net/JordanHalterman/introduction-to-apache-calcite

> ParquetTable.buildIOReader should support column projection and filter 
> predicate
> 
>
> Key: BEAM-7929
> URL: https://issues.apache.org/jira/browse/BEAM-7929
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Affects Versions: 2.14.0
>Reporter: Neville Li
>Priority: Critical
>
> To leverage the performance improvements in Parquet.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (BEAM-7929) ParquetTable.buildIOReader should support column projection and filter predicate

2019-08-08 Thread Rui Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-7929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16903192#comment-16903192
 ] 

Rui Wang commented on BEAM-7929:


Calcite has well support on filter/pushdown so we don't need to analyzing 
Calcite plan and Calcite will help us do that by optimization rules and calling 
convention.

For example, BeamSQL has adopted some rules: 
https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java#L73


I can find many useful links on this topic, like:

http://mail-archives.apache.org/mod_mbox/calcite-dev/201610.mbox/%3c96745466-0bfb-49b4-8396-70a992cae...@apache.org%3E
https://stackoverflow.com/questions/40217160/how-to-push-down-project-filter-aggregation-to-tablescan-in-calcite


There is a also a good slides to cover many piece of calcite: 
https://www.slideshare.net/JordanHalterman/introduction-to-apache-calcite

> ParquetTable.buildIOReader should support column projection and filter 
> predicate
> 
>
> Key: BEAM-7929
> URL: https://issues.apache.org/jira/browse/BEAM-7929
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Affects Versions: 2.14.0
>Reporter: Neville Li
>Priority: Critical
>
> To leverage the performance improvements in Parquet.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Closed] (BEAM-2103) Document Python 3 support in Beam starting from 2.14.0

2019-08-08 Thread Valentyn Tymofieiev (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-2103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Valentyn Tymofieiev closed BEAM-2103.
-
   Resolution: Fixed
Fix Version/s: Not applicable

> Document Python 3 support in Beam starting from 2.14.0
> --
>
> Key: BEAM-2103
> URL: https://issues.apache.org/jira/browse/BEAM-2103
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core, website
>Affects Versions: 0.6.0
>Reporter: Tobias Kaymak
>Assignee: Rose Nguyen
>Priority: Blocker
> Fix For: Not applicable
>
>  Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> Beam website documentation should mention Python 3.5 - 3.7 support in 
> addition to Python 2.7. Available user documentation (e.g. quickstarts) 
> should be adjusted where needed, to accommodate for Beam Python 3 users.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-7389) Colab examples for element-wise transforms (Python)

2019-08-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7389?focusedWorklogId=291470=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291470
 ]

ASF GitHub Bot logged work on BEAM-7389:


Author: ASF GitHub Bot
Created on: 08/Aug/19 17:56
Start Date: 08/Aug/19 17:56
Worklog Time Spent: 10m 
  Work Description: rosetn commented on pull request #9267: [BEAM-7389] Add 
code examples for WithTimestamps page
URL: https://github.com/apache/beam/pull/9267#discussion_r312163809
 
 

 ##
 File path: 
website/src/documentation/transforms/python/element-wise/withtimestamps.md
 ##
 @@ -19,10 +19,116 @@ limitations under the License.
 -->
 
 # WithTimestamps
+
+
+localStorage.setItem('language', 'language-py')
+
+
 Assigns timestamps to all the elements of a collection.
 
 ## Examples
-See [BEAM-7389](https://issues.apache.org/jira/browse/BEAM-7389) for updates. 
 
-## Related transforms 
-* [Reify]({{ site.baseurl 
}}/documentation/transforms/python/elementwise/reify) converts between explicit 
and implicit forms of Beam values.
\ No newline at end of file
+In the following examples, we create a pipeline with a `PCollection` and 
attach a timestamp value to each of its elements.
+Timestamps are especially useful on streaming pipelines where windowing and 
late data play a more important role.
+
+### Example 1: Timestamp by event time
+
+Often times, the elements themselves already contain a timestamp field that 
can be used.
+`beam.window.TimestampedValue` will take a value and a timestamp in the form 
of seconds, where a unix timestamp can be used.
+
+```py
+{% github_sample 
/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/element_wise/with_timestamps.py
 tag:event_time %}```
+
+Output `PCollection` after getting the timestamps:
+
+```
+{% github_sample 
/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/element_wise/with_timestamps_test.py
 tag:plant_timestamps %}```
+
+
+  
+https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/element_wise/with_timestamps.py;>
+  https://www.tensorflow.org/images/GitHub-Mark-32px.png;
+width="20px" height="20px" alt="View on GitHub" />
+  View on GitHub
+
+  
+
+
+
+To convert from a
+[`time.struct_time`](https://docs.python.org/3/library/time.html#time.struct_time)
+to `unix_time` you can use
+[`time.mktime`](https://docs.python.org/3/library/time.html#time.mktime).
+For more information on time formatting options, see
+[`time.strftime`](https://docs.python.org/3/library/time.html#time.strftime).
+
+```
+{% github_sample 
/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/element_wise/with_timestamps.py
 tag:time_tuple2unix_time %}```
+
+To convert from a
+[`datetime.datetime`](https://docs.python.org/3/library/datetime.html#datetime.datetime)
+to `unix_time` you can use convert it to a `time.struct_time` first with
+[`datetime.timetuple`](https://docs.python.org/3/library/datetime.html#datetime.datetime.timetuple).
+
+```
+{% github_sample 
/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/element_wise/with_timestamps.py
 tag:datetime2unix_time %}```
+
+### Example 2: Timestamp by logical clock
+
+If the elements have a chronological number, those can be used as a
+[logical clock](https://en.wikipedia.org/wiki/Logical_clock).
+They have to be converted to a *"seconds"* equivalent, which can be especially 
important depending on your windowing and late data rules.
 
 Review comment:
   Replace "They" with whichever noun you meant ("These elements" or "These 
numbers")
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 291470)
Time Spent: 37h 40m  (was: 37.5h)

> Colab examples for element-wise transforms (Python)
> ---
>
> Key: BEAM-7389
> URL: https://issues.apache.org/jira/browse/BEAM-7389
> Project: Beam
>  Issue Type: Improvement
>  Components: website
>Reporter: Rose Nguyen
>Assignee: David Cavazos
>Priority: Minor
>  Time Spent: 37h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-7389) Colab examples for element-wise transforms (Python)

2019-08-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7389?focusedWorklogId=291473=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291473
 ]

ASF GitHub Bot logged work on BEAM-7389:


Author: ASF GitHub Bot
Created on: 08/Aug/19 17:56
Start Date: 08/Aug/19 17:56
Worklog Time Spent: 10m 
  Work Description: rosetn commented on pull request #9267: [BEAM-7389] Add 
code examples for WithTimestamps page
URL: https://github.com/apache/beam/pull/9267#discussion_r312164889
 
 

 ##
 File path: 
website/src/documentation/transforms/python/element-wise/withtimestamps.md
 ##
 @@ -19,10 +19,116 @@ limitations under the License.
 -->
 
 # WithTimestamps
+
+
+localStorage.setItem('language', 'language-py')
+
+
 Assigns timestamps to all the elements of a collection.
 
 ## Examples
-See [BEAM-7389](https://issues.apache.org/jira/browse/BEAM-7389) for updates. 
 
-## Related transforms 
-* [Reify]({{ site.baseurl 
}}/documentation/transforms/python/elementwise/reify) converts between explicit 
and implicit forms of Beam values.
\ No newline at end of file
+In the following examples, we create a pipeline with a `PCollection` and 
attach a timestamp value to each of its elements.
+Timestamps are especially useful on streaming pipelines where windowing and 
late data play a more important role.
+
+### Example 1: Timestamp by event time
+
+Often times, the elements themselves already contain a timestamp field that 
can be used.
+`beam.window.TimestampedValue` will take a value and a timestamp in the form 
of seconds, where a unix timestamp can be used.
+
+```py
+{% github_sample 
/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/element_wise/with_timestamps.py
 tag:event_time %}```
+
+Output `PCollection` after getting the timestamps:
+
+```
+{% github_sample 
/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/element_wise/with_timestamps_test.py
 tag:plant_timestamps %}```
+
+
+  
+https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/element_wise/with_timestamps.py;>
+  https://www.tensorflow.org/images/GitHub-Mark-32px.png;
+width="20px" height="20px" alt="View on GitHub" />
+  View on GitHub
+
+  
+
+
+
+To convert from a
+[`time.struct_time`](https://docs.python.org/3/library/time.html#time.struct_time)
+to `unix_time` you can use
+[`time.mktime`](https://docs.python.org/3/library/time.html#time.mktime).
+For more information on time formatting options, see
+[`time.strftime`](https://docs.python.org/3/library/time.html#time.strftime).
+
+```
+{% github_sample 
/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/element_wise/with_timestamps.py
 tag:time_tuple2unix_time %}```
+
+To convert from a
+[`datetime.datetime`](https://docs.python.org/3/library/datetime.html#datetime.datetime)
+to `unix_time` you can use convert it to a `time.struct_time` first with
+[`datetime.timetuple`](https://docs.python.org/3/library/datetime.html#datetime.datetime.timetuple).
+
+```
+{% github_sample 
/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/element_wise/with_timestamps.py
 tag:datetime2unix_time %}```
+
+### Example 2: Timestamp by logical clock
+
+If the elements have a chronological number, those can be used as a
+[logical clock](https://en.wikipedia.org/wiki/Logical_clock).
+They have to be converted to a *"seconds"* equivalent, which can be especially 
important depending on your windowing and late data rules.
+
+```py
+{% github_sample 
/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/element_wise/with_timestamps.py
 tag:logical_clock %}```
+
+Output `PCollection` after getting the timestamps:
+
+```
+{% github_sample 
/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/element_wise/with_timestamps_test.py
 tag:plant_events %}```
+
+
+  
+https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/element_wise/with_timestamps.py;>
+  https://www.tensorflow.org/images/GitHub-Mark-32px.png;
+width="20px" height="20px" alt="View on GitHub" />
+  View on GitHub
+
+  
+
+
+
+### Example 3: Timestamp by processing time
+
+If the elements do not have any time data available, you can also use the 
current processing time for each element.
+Note that this will grab the local time of the *worker* that is processing 
each element.
+Workers might have time deltas, so it is not a reliable way to do precise 
ordering.
 
 Review comment:
   "it"->"using this method"
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, 

[jira] [Work logged] (BEAM-7389) Colab examples for element-wise transforms (Python)

2019-08-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7389?focusedWorklogId=291472=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291472
 ]

ASF GitHub Bot logged work on BEAM-7389:


Author: ASF GitHub Bot
Created on: 08/Aug/19 17:56
Start Date: 08/Aug/19 17:56
Worklog Time Spent: 10m 
  Work Description: rosetn commented on pull request #9267: [BEAM-7389] Add 
code examples for WithTimestamps page
URL: https://github.com/apache/beam/pull/9267#discussion_r312165421
 
 

 ##
 File path: 
website/src/documentation/transforms/python/element-wise/withtimestamps.md
 ##
 @@ -19,10 +19,116 @@ limitations under the License.
 -->
 
 # WithTimestamps
+
+
+localStorage.setItem('language', 'language-py')
+
+
 Assigns timestamps to all the elements of a collection.
 
 ## Examples
-See [BEAM-7389](https://issues.apache.org/jira/browse/BEAM-7389) for updates. 
 
-## Related transforms 
-* [Reify]({{ site.baseurl 
}}/documentation/transforms/python/elementwise/reify) converts between explicit 
and implicit forms of Beam values.
\ No newline at end of file
+In the following examples, we create a pipeline with a `PCollection` and 
attach a timestamp value to each of its elements.
+Timestamps are especially useful on streaming pipelines where windowing and 
late data play a more important role.
+
+### Example 1: Timestamp by event time
+
+Often times, the elements themselves already contain a timestamp field that 
can be used.
+`beam.window.TimestampedValue` will take a value and a timestamp in the form 
of seconds, where a unix timestamp can be used.
+
+```py
+{% github_sample 
/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/element_wise/with_timestamps.py
 tag:event_time %}```
+
+Output `PCollection` after getting the timestamps:
+
+```
+{% github_sample 
/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/element_wise/with_timestamps_test.py
 tag:plant_timestamps %}```
+
+
+  
+https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/element_wise/with_timestamps.py;>
+  https://www.tensorflow.org/images/GitHub-Mark-32px.png;
+width="20px" height="20px" alt="View on GitHub" />
+  View on GitHub
+
+  
+
+
+
+To convert from a
+[`time.struct_time`](https://docs.python.org/3/library/time.html#time.struct_time)
+to `unix_time` you can use
+[`time.mktime`](https://docs.python.org/3/library/time.html#time.mktime).
+For more information on time formatting options, see
+[`time.strftime`](https://docs.python.org/3/library/time.html#time.strftime).
+
+```
+{% github_sample 
/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/element_wise/with_timestamps.py
 tag:time_tuple2unix_time %}```
+
+To convert from a
+[`datetime.datetime`](https://docs.python.org/3/library/datetime.html#datetime.datetime)
+to `unix_time` you can use convert it to a `time.struct_time` first with
+[`datetime.timetuple`](https://docs.python.org/3/library/datetime.html#datetime.datetime.timetuple).
+
+```
+{% github_sample 
/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/element_wise/with_timestamps.py
 tag:datetime2unix_time %}```
+
+### Example 2: Timestamp by logical clock
+
+If the elements have a chronological number, those can be used as a
+[logical clock](https://en.wikipedia.org/wiki/Logical_clock).
+They have to be converted to a *"seconds"* equivalent, which can be especially 
important depending on your windowing and late data rules.
+
+```py
+{% github_sample 
/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/element_wise/with_timestamps.py
 tag:logical_clock %}```
+
+Output `PCollection` after getting the timestamps:
+
+```
+{% github_sample 
/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/element_wise/with_timestamps_test.py
 tag:plant_events %}```
+
+
+  
+https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/element_wise/with_timestamps.py;>
+  https://www.tensorflow.org/images/GitHub-Mark-32px.png;
+width="20px" height="20px" alt="View on GitHub" />
+  View on GitHub
+
+  
+
+
+
+### Example 3: Timestamp by processing time
+
+If the elements do not have any time data available, you can also use the 
current processing time for each element.
+Note that this will grab the local time of the *worker* that is processing 
each element.
 
 Review comment:
   "will grab"->"grabs"
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking

[jira] [Work logged] (BEAM-7389) Colab examples for element-wise transforms (Python)

2019-08-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7389?focusedWorklogId=291475=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291475
 ]

ASF GitHub Bot logged work on BEAM-7389:


Author: ASF GitHub Bot
Created on: 08/Aug/19 17:56
Start Date: 08/Aug/19 17:56
Worklog Time Spent: 10m 
  Work Description: rosetn commented on pull request #9267: [BEAM-7389] Add 
code examples for WithTimestamps page
URL: https://github.com/apache/beam/pull/9267#discussion_r312167188
 
 

 ##
 File path: 
website/src/documentation/transforms/python/element-wise/withtimestamps.md
 ##
 @@ -19,10 +19,116 @@ limitations under the License.
 -->
 
 # WithTimestamps
+
+
+localStorage.setItem('language', 'language-py')
+
+
 Assigns timestamps to all the elements of a collection.
 
 ## Examples
-See [BEAM-7389](https://issues.apache.org/jira/browse/BEAM-7389) for updates. 
 
-## Related transforms 
-* [Reify]({{ site.baseurl 
}}/documentation/transforms/python/elementwise/reify) converts between explicit 
and implicit forms of Beam values.
\ No newline at end of file
+In the following examples, we create a pipeline with a `PCollection` and 
attach a timestamp value to each of its elements.
+Timestamps are especially useful on streaming pipelines where windowing and 
late data play a more important role.
+
+### Example 1: Timestamp by event time
+
+Often times, the elements themselves already contain a timestamp field that 
can be used.
+`beam.window.TimestampedValue` will take a value and a timestamp in the form 
of seconds, where a unix timestamp can be used.
+
+```py
+{% github_sample 
/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/element_wise/with_timestamps.py
 tag:event_time %}```
+
+Output `PCollection` after getting the timestamps:
+
+```
+{% github_sample 
/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/element_wise/with_timestamps_test.py
 tag:plant_timestamps %}```
+
+
+  
+https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/element_wise/with_timestamps.py;>
+  https://www.tensorflow.org/images/GitHub-Mark-32px.png;
+width="20px" height="20px" alt="View on GitHub" />
+  View on GitHub
+
+  
+
+
+
+To convert from a
+[`time.struct_time`](https://docs.python.org/3/library/time.html#time.struct_time)
+to `unix_time` you can use
+[`time.mktime`](https://docs.python.org/3/library/time.html#time.mktime).
+For more information on time formatting options, see
+[`time.strftime`](https://docs.python.org/3/library/time.html#time.strftime).
+
+```
+{% github_sample 
/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/element_wise/with_timestamps.py
 tag:time_tuple2unix_time %}```
+
+To convert from a
+[`datetime.datetime`](https://docs.python.org/3/library/datetime.html#datetime.datetime)
+to `unix_time` you can use convert it to a `time.struct_time` first with
+[`datetime.timetuple`](https://docs.python.org/3/library/datetime.html#datetime.datetime.timetuple).
+
+```
+{% github_sample 
/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/element_wise/with_timestamps.py
 tag:datetime2unix_time %}```
+
+### Example 2: Timestamp by logical clock
+
+If the elements have a chronological number, those can be used as a
+[logical clock](https://en.wikipedia.org/wiki/Logical_clock).
+They have to be converted to a *"seconds"* equivalent, which can be especially 
important depending on your windowing and late data rules.
+
+```py
+{% github_sample 
/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/element_wise/with_timestamps.py
 tag:logical_clock %}```
+
+Output `PCollection` after getting the timestamps:
+
+```
+{% github_sample 
/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/element_wise/with_timestamps_test.py
 tag:plant_events %}```
+
+
+  
+https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/element_wise/with_timestamps.py;>
+  https://www.tensorflow.org/images/GitHub-Mark-32px.png;
+width="20px" height="20px" alt="View on GitHub" />
+  View on GitHub
+
+  
+
+
+
+### Example 3: Timestamp by processing time
+
+If the elements do not have any time data available, you can also use the 
current processing time for each element.
+Note that this will grab the local time of the *worker* that is processing 
each element.
+Workers might have time deltas, so it is not a reliable way to do precise 
ordering.
+
+Using the processing time also means that the timestamp will be attached as 
soon as the element *enters* into the pipeline,
 
 Review comment:
   WDYT of this suggestion?
   
   By using processing time, there is no way of knowing if data is arriving 
late because the timestamp is attached when the element 

[jira] [Work logged] (BEAM-7389) Colab examples for element-wise transforms (Python)

2019-08-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7389?focusedWorklogId=291471=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291471
 ]

ASF GitHub Bot logged work on BEAM-7389:


Author: ASF GitHub Bot
Created on: 08/Aug/19 17:56
Start Date: 08/Aug/19 17:56
Worklog Time Spent: 10m 
  Work Description: rosetn commented on pull request #9267: [BEAM-7389] Add 
code examples for WithTimestamps page
URL: https://github.com/apache/beam/pull/9267#discussion_r312158080
 
 

 ##
 File path: 
website/src/documentation/transforms/python/element-wise/withtimestamps.md
 ##
 @@ -19,10 +19,116 @@ limitations under the License.
 -->
 
 # WithTimestamps
+
+
+localStorage.setItem('language', 'language-py')
+
+
 Assigns timestamps to all the elements of a collection.
 
 ## Examples
-See [BEAM-7389](https://issues.apache.org/jira/browse/BEAM-7389) for updates. 
 
-## Related transforms 
-* [Reify]({{ site.baseurl 
}}/documentation/transforms/python/elementwise/reify) converts between explicit 
and implicit forms of Beam values.
\ No newline at end of file
+In the following examples, we create a pipeline with a `PCollection` and 
attach a timestamp value to each of its elements.
+Timestamps are especially useful on streaming pipelines where windowing and 
late data play a more important role.
+
+### Example 1: Timestamp by event time
+
+Often times, the elements themselves already contain a timestamp field that 
can be used.
 
 Review comment:
   "The elements themselves often already contain a timestamp field."
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 291471)
Time Spent: 37h 50m  (was: 37h 40m)

> Colab examples for element-wise transforms (Python)
> ---
>
> Key: BEAM-7389
> URL: https://issues.apache.org/jira/browse/BEAM-7389
> Project: Beam
>  Issue Type: Improvement
>  Components: website
>Reporter: Rose Nguyen
>Assignee: David Cavazos
>Priority: Minor
>  Time Spent: 37h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-7389) Colab examples for element-wise transforms (Python)

2019-08-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7389?focusedWorklogId=291469=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291469
 ]

ASF GitHub Bot logged work on BEAM-7389:


Author: ASF GitHub Bot
Created on: 08/Aug/19 17:56
Start Date: 08/Aug/19 17:56
Worklog Time Spent: 10m 
  Work Description: rosetn commented on pull request #9267: [BEAM-7389] Add 
code examples for WithTimestamps page
URL: https://github.com/apache/beam/pull/9267#discussion_r312156564
 
 

 ##
 File path: 
website/src/documentation/transforms/python/element-wise/withtimestamps.md
 ##
 @@ -19,10 +19,116 @@ limitations under the License.
 -->
 
 # WithTimestamps
+
+
+localStorage.setItem('language', 'language-py')
+
+
 Assigns timestamps to all the elements of a collection.
 
 ## Examples
-See [BEAM-7389](https://issues.apache.org/jira/browse/BEAM-7389) for updates. 
 
-## Related transforms 
-* [Reify]({{ site.baseurl 
}}/documentation/transforms/python/elementwise/reify) converts between explicit 
and implicit forms of Beam values.
\ No newline at end of file
+In the following examples, we create a pipeline with a `PCollection` and 
attach a timestamp value to each of its elements.
+Timestamps are especially useful on streaming pipelines where windowing and 
late data play a more important role.
 
 Review comment:
   Clause order
   
   "When windowing and late data play an important role in streaming pipelines, 
timestamps are especially useful."
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 291469)
Time Spent: 37.5h  (was: 37h 20m)

> Colab examples for element-wise transforms (Python)
> ---
>
> Key: BEAM-7389
> URL: https://issues.apache.org/jira/browse/BEAM-7389
> Project: Beam
>  Issue Type: Improvement
>  Components: website
>Reporter: Rose Nguyen
>Assignee: David Cavazos
>Priority: Minor
>  Time Spent: 37.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-7389) Colab examples for element-wise transforms (Python)

2019-08-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7389?focusedWorklogId=291474=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291474
 ]

ASF GitHub Bot logged work on BEAM-7389:


Author: ASF GitHub Bot
Created on: 08/Aug/19 17:56
Start Date: 08/Aug/19 17:56
Worklog Time Spent: 10m 
  Work Description: rosetn commented on pull request #9267: [BEAM-7389] Add 
code examples for WithTimestamps page
URL: https://github.com/apache/beam/pull/9267#discussion_r312159071
 
 

 ##
 File path: 
website/src/documentation/transforms/python/element-wise/withtimestamps.md
 ##
 @@ -19,10 +19,116 @@ limitations under the License.
 -->
 
 # WithTimestamps
+
+
+localStorage.setItem('language', 'language-py')
+
+
 Assigns timestamps to all the elements of a collection.
 
 ## Examples
-See [BEAM-7389](https://issues.apache.org/jira/browse/BEAM-7389) for updates. 
 
-## Related transforms 
-* [Reify]({{ site.baseurl 
}}/documentation/transforms/python/elementwise/reify) converts between explicit 
and implicit forms of Beam values.
\ No newline at end of file
+In the following examples, we create a pipeline with a `PCollection` and 
attach a timestamp value to each of its elements.
+Timestamps are especially useful on streaming pipelines where windowing and 
late data play a more important role.
+
+### Example 1: Timestamp by event time
+
+Often times, the elements themselves already contain a timestamp field that 
can be used.
+`beam.window.TimestampedValue` will take a value and a timestamp in the form 
of seconds, where a unix timestamp can be used.
 
 Review comment:
   "`beam.window.TimestampedValue` takes a value and a Unix timestamp in the 
form of seconds."
   
   ^is this sentence still accurate? Else try shortening the sentence:
   
   "`beam.window.TimestampedValue` takes a value and a timestamp in the form of 
seconds."
   
   and you have a comment after this section on converting to Unix timestamps.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 291474)

> Colab examples for element-wise transforms (Python)
> ---
>
> Key: BEAM-7389
> URL: https://issues.apache.org/jira/browse/BEAM-7389
> Project: Beam
>  Issue Type: Improvement
>  Components: website
>Reporter: Rose Nguyen
>Assignee: David Cavazos
>Priority: Minor
>  Time Spent: 38h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-7389) Colab examples for element-wise transforms (Python)

2019-08-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7389?focusedWorklogId=291476=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291476
 ]

ASF GitHub Bot logged work on BEAM-7389:


Author: ASF GitHub Bot
Created on: 08/Aug/19 17:56
Start Date: 08/Aug/19 17:56
Worklog Time Spent: 10m 
  Work Description: rosetn commented on pull request #9267: [BEAM-7389] Add 
code examples for WithTimestamps page
URL: https://github.com/apache/beam/pull/9267#discussion_r312163529
 
 

 ##
 File path: 
website/src/documentation/transforms/python/element-wise/withtimestamps.md
 ##
 @@ -19,10 +19,116 @@ limitations under the License.
 -->
 
 # WithTimestamps
+
+
+localStorage.setItem('language', 'language-py')
+
+
 Assigns timestamps to all the elements of a collection.
 
 ## Examples
-See [BEAM-7389](https://issues.apache.org/jira/browse/BEAM-7389) for updates. 
 
-## Related transforms 
-* [Reify]({{ site.baseurl 
}}/documentation/transforms/python/elementwise/reify) converts between explicit 
and implicit forms of Beam values.
\ No newline at end of file
+In the following examples, we create a pipeline with a `PCollection` and 
attach a timestamp value to each of its elements.
+Timestamps are especially useful on streaming pipelines where windowing and 
late data play a more important role.
+
+### Example 1: Timestamp by event time
+
+Often times, the elements themselves already contain a timestamp field that 
can be used.
+`beam.window.TimestampedValue` will take a value and a timestamp in the form 
of seconds, where a unix timestamp can be used.
+
+```py
+{% github_sample 
/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/element_wise/with_timestamps.py
 tag:event_time %}```
+
+Output `PCollection` after getting the timestamps:
+
+```
+{% github_sample 
/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/element_wise/with_timestamps_test.py
 tag:plant_timestamps %}```
+
+
+  
+https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/element_wise/with_timestamps.py;>
+  https://www.tensorflow.org/images/GitHub-Mark-32px.png;
+width="20px" height="20px" alt="View on GitHub" />
+  View on GitHub
+
+  
+
+
+
+To convert from a
+[`time.struct_time`](https://docs.python.org/3/library/time.html#time.struct_time)
+to `unix_time` you can use
+[`time.mktime`](https://docs.python.org/3/library/time.html#time.mktime).
+For more information on time formatting options, see
+[`time.strftime`](https://docs.python.org/3/library/time.html#time.strftime).
+
+```
+{% github_sample 
/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/element_wise/with_timestamps.py
 tag:time_tuple2unix_time %}```
+
+To convert from a
+[`datetime.datetime`](https://docs.python.org/3/library/datetime.html#datetime.datetime)
+to `unix_time` you can use convert it to a `time.struct_time` first with
+[`datetime.timetuple`](https://docs.python.org/3/library/datetime.html#datetime.datetime.timetuple).
+
+```
+{% github_sample 
/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/element_wise/with_timestamps.py
 tag:datetime2unix_time %}```
+
+### Example 2: Timestamp by logical clock
+
+If the elements have a chronological number, those can be used as a
 
 Review comment:
   Ambiguous pronoun. Do you mean
   "If each element has a chronological number, these elements can be used as a 
logical clock."
   or
   "If each element has a chronological number, these numbers can be used as a 
logical clock."
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 291476)
Time Spent: 38h 20m  (was: 38h 10m)

> Colab examples for element-wise transforms (Python)
> ---
>
> Key: BEAM-7389
> URL: https://issues.apache.org/jira/browse/BEAM-7389
> Project: Beam
>  Issue Type: Improvement
>  Components: website
>Reporter: Rose Nguyen
>Assignee: David Cavazos
>Priority: Minor
>  Time Spent: 38h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (BEAM-7930) bundle_processor log spam using python SDK on dataflow runner

2019-08-08 Thread James Hutchison (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-7930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16903189#comment-16903189
 ] 

James Hutchison commented on BEAM-7930:
---

If this isn't already a known issue I can try to provide more information.

> bundle_processor log spam using python SDK on dataflow runner
> -
>
> Key: BEAM-7930
> URL: https://issues.apache.org/jira/browse/BEAM-7930
> Project: Beam
>  Issue Type: Bug
>  Components: runner-dataflow
>Affects Versions: 2.13.0
>Reporter: James Hutchison
>Priority: Minor
>
> When running my pipeline on dataflow, I can see in the stackdriver logs a 
> large amount of spam for the following messages (note that the numbers in 
> them change every message):
>  * [INFO] (bundle_processor.create_operation) No unique name set for 
> transform generatedPtransform-67
>  * [INFO] (bundle_processor.create_operation) No unique name for transform -19
>  * [ERROR] (bundle_processor.create) Missing required coder_id on grpc_port 
> for -19; using deprecated fallback.
> I tried using a breakpoint on where these log messages originate using the 
> direct runner and it never hit it, so I don't know specifically what is 
> causing them.
> I also tried using the logging module to change the threshold and also mocked 
> out the logging attribute in the bundle_processor module to change the log 
> level to CRITICAL and I still see the log messages.
> The pipeline is a streaming pipeline that reads from two pubsub topics, 
> merges the inputs and runs distinct on the inputs over each processing time 
> window, fetches from an external service, does processing, and inserts into 
> elasticsearch with failures going into bigquery. I notice the log messages 
> seem to cluster and this appears early on before any other log messages in 
> any of the other steps so I wonder if maybe this is coming from the pubsub 
> read or windowing portion.
> Expected behavior:
>  * I don't expect to see these noisy log messages which seem to indicate 
> something is wrong
>  * The missing required coder_id message is at the ERROR log level so it 
> pollutes the error logs. I would expect this to be at the WARNING or INFO 
> level.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Reopened] (BEAM-7476) Datastore write failures with "[Errno 32] Broken pipe"

2019-08-08 Thread Udi Meiri (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Udi Meiri reopened BEAM-7476:
-

reopening so it gets cherrypicked onto 2.15

> Datastore write failures with "[Errno 32] Broken pipe"
> --
>
> Key: BEAM-7476
> URL: https://issues.apache.org/jira/browse/BEAM-7476
> Project: Beam
>  Issue Type: Bug
>  Components: io-py-gcp
>Affects Versions: 2.11.0
> Environment: dataflow python 2.7
>Reporter: Dmytro Sadovnychyi
>Assignee: Dmytro Sadovnychyi
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 2.15.0
>
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> We are getting lots of Broken pipe errors and it's only a matter of luck for 
> write to succeed. It's been happening for months.
> Partial stack trace:
> File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/gcp/datastore/v1/helper.py",
>  line 225, in commit
> response = datastore.commit(request)
>   File 
> "/usr/local/lib/python2.7/dist-packages/googledatastore/connection.py", line 
> 140, in commit
> datastore_pb2.CommitResponse)
>   File 
> "/usr/local/lib/python2.7/dist-packages/googledatastore/connection.py", line 
> 199, in _call_method
> method='POST', body=payload, headers=headers)
>   File "/usr/local/lib/python2.7/dist-packages/oauth2client/transport.py", 
> line 169, in new_request
> redirections, connection_type)
>   File "/usr/local/lib/python2.7/dist-packages/httplib2/__init__.py", line 
> 1609, in request
> (response, content) = self._request(conn, authority, uri, request_uri, 
> method, body, headers, redirections, cachekey)
>   File "/usr/local/lib/python2.7/dist-packages/httplib2/__init__.py", line 
> 1351, in _request
> (response, content) = self._conn_request(conn, request_uri, method, body, 
> headers)
>   File "/usr/local/lib/python2.7/dist-packages/httplib2/__init__.py", line 
> 1273, in _conn_request
> conn.request(method, request_uri, body, headers)
>   File "/usr/lib/python2.7/httplib.py", line 1042, in request
> self._send_request(method, url, body, headers)
>   File "/usr/lib/python2.7/httplib.py", line 1082, in _send_request
> self.endheaders(body)
>   File "/usr/lib/python2.7/httplib.py", line 1038, in endheaders
> self._send_output(message_body)
>   File "/usr/lib/python2.7/httplib.py", line 882, in _send_output
> self.send(msg)
>   File "/usr/lib/python2.7/httplib.py", line 858, in send
> self.sock.sendall(data)
>   File "/usr/lib/python2.7/ssl.py", line 753, in sendall
> v = self.send(data[count:])
>   File "/usr/lib/python2.7/ssl.py", line 719, in send
> v = self._sslobj.write(data)
> RuntimeError: error: [Errno 32] Broken pipe [while running 'Groups to 
> datastore/Write Mutation to Datastore']
> Workaround: https://github.com/apache/beam/pull/8346



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (BEAM-7476) Datastore write failures with "[Errno 32] Broken pipe"

2019-08-08 Thread Udi Meiri (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Udi Meiri updated BEAM-7476:

Status: Open  (was: Triage Needed)

> Datastore write failures with "[Errno 32] Broken pipe"
> --
>
> Key: BEAM-7476
> URL: https://issues.apache.org/jira/browse/BEAM-7476
> Project: Beam
>  Issue Type: Bug
>  Components: io-py-gcp
>Affects Versions: 2.11.0
> Environment: dataflow python 2.7
>Reporter: Dmytro Sadovnychyi
>Assignee: Dmytro Sadovnychyi
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 2.15.0
>
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> We are getting lots of Broken pipe errors and it's only a matter of luck for 
> write to succeed. It's been happening for months.
> Partial stack trace:
> File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/gcp/datastore/v1/helper.py",
>  line 225, in commit
> response = datastore.commit(request)
>   File 
> "/usr/local/lib/python2.7/dist-packages/googledatastore/connection.py", line 
> 140, in commit
> datastore_pb2.CommitResponse)
>   File 
> "/usr/local/lib/python2.7/dist-packages/googledatastore/connection.py", line 
> 199, in _call_method
> method='POST', body=payload, headers=headers)
>   File "/usr/local/lib/python2.7/dist-packages/oauth2client/transport.py", 
> line 169, in new_request
> redirections, connection_type)
>   File "/usr/local/lib/python2.7/dist-packages/httplib2/__init__.py", line 
> 1609, in request
> (response, content) = self._request(conn, authority, uri, request_uri, 
> method, body, headers, redirections, cachekey)
>   File "/usr/local/lib/python2.7/dist-packages/httplib2/__init__.py", line 
> 1351, in _request
> (response, content) = self._conn_request(conn, request_uri, method, body, 
> headers)
>   File "/usr/local/lib/python2.7/dist-packages/httplib2/__init__.py", line 
> 1273, in _conn_request
> conn.request(method, request_uri, body, headers)
>   File "/usr/lib/python2.7/httplib.py", line 1042, in request
> self._send_request(method, url, body, headers)
>   File "/usr/lib/python2.7/httplib.py", line 1082, in _send_request
> self.endheaders(body)
>   File "/usr/lib/python2.7/httplib.py", line 1038, in endheaders
> self._send_output(message_body)
>   File "/usr/lib/python2.7/httplib.py", line 882, in _send_output
> self.send(msg)
>   File "/usr/lib/python2.7/httplib.py", line 858, in send
> self.sock.sendall(data)
>   File "/usr/lib/python2.7/ssl.py", line 753, in sendall
> v = self.send(data[count:])
>   File "/usr/lib/python2.7/ssl.py", line 719, in send
> v = self._sslobj.write(data)
> RuntimeError: error: [Errno 32] Broken pipe [while running 'Groups to 
> datastore/Write Mutation to Datastore']
> Workaround: https://github.com/apache/beam/pull/8346



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues

2019-08-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=291466=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291466
 ]

ASF GitHub Bot logged work on BEAM-7866:


Author: ASF GitHub Bot
Created on: 08/Aug/19 17:46
Start Date: 08/Aug/19 17:46
Worklog Time Spent: 10m 
  Work Description: y1chi commented on pull request #9233:  [BEAM-7866] Fix 
python ReadFromMongoDB potential data loss issue
URL: https://github.com/apache/beam/pull/9233#discussion_r312163044
 
 

 ##
 File path: sdks/python/apache_beam/io/mongodbio_test.py
 ##
 @@ -30,38 +34,138 @@
 from apache_beam.io.mongodbio import _BoundedMongoSource
 from apache_beam.io.mongodbio import _GenerateObjectIdFn
 from apache_beam.io.mongodbio import _MongoSink
+from apache_beam.io.mongodbio import _ObjectIdHelper
+from apache_beam.io.mongodbio import _ObjectIdRangeTracker
 from apache_beam.io.mongodbio import _WriteMongoFn
 from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.testing.util import assert_that
 from apache_beam.testing.util import equal_to
 
 
+class _MockMongoColl(object):
+  """Fake mongodb collection cursor."""
+
+  def __init__(self, docs):
+self.docs = docs
+
+  def _filter(self, filter):
+match = []
+if not filter:
+  return self
+if '$and' not in filter or not filter['$and']:
+  return self
+start = filter['$and'][0]['_id'].get('$gte')
+end = filter['$and'][0]['_id'].get('$lt')
+assert start is not None
+assert end is not None
+for doc in self.docs:
+  if start and doc['_id'] < start:
+continue
+  if end and doc['_id'] >= end:
+continue
+  match.append(doc)
+return match
+
+  def find(self, filter=None, **kwargs):
+return _MockMongoColl(self._filter(filter))
+
+  def sort(self, sort_items):
+key, order = sort_items[0]
+self.docs = sorted(self.docs,
+   key=lambda x: x[key],
+   reverse=(order != ASCENDING))
+return self
+
+  def limit(self, num):
+return _MockMongoColl(self.docs[0:num])
+
+  def count_documents(self, filter):
+return len(self._filter(filter))
+
+  def __getitem__(self, index):
+return self.docs[index]
+
+
+class _MockMongoDb(object):
+  """Fake Mongo Db."""
+
+  def __init__(self, docs):
+self.docs = docs
+
+  def __getitem__(self, coll_name):
+return _MockMongoColl(self.docs)
+
+  def command(self, command, *args, **kwargs):
+if command == 'collstats':
+  return {'size': 5, 'avgSize': 1}
+elif command == 'splitVector':
+  return self.get_split_key(command, *args, **kwargs)
+
+  def get_split_key(self, command, ns, min, max, maxChunkSize, **kwargs):
 
 Review comment:
   Done
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 291466)
Time Spent: 9h 40m  (was: 9.5h)

> Python MongoDB IO performance and correctness issues
> 
>
> Key: BEAM-7866
> URL: https://issues.apache.org/jira/browse/BEAM-7866
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Eugene Kirpichov
>Assignee: Yichi Zhang
>Priority: Blocker
> Fix For: 2.15.0
>
>  Time Spent: 9h 40m
>  Remaining Estimate: 0h
>
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py
>  splits the query result by computing number of results in constructor, and 
> then in each reader re-executing the whole query and getting an index 
> sub-range of those results.
> This is broken in several critical ways:
> - The order of query results returned by find() is not necessarily 
> deterministic, so the idea of index ranges on it is meaningless: each shard 
> may basically get random, possibly overlapping subsets of the total results
> - Even if you add order by `_id`, the database may be changing concurrently 
> to reading and splitting. E.g. if the database contained documents with ids 
> 10 20 30 40 50, and this was split into shards 0..2 and 3..5 (under the 
> assumption that these shards would contain respectively 10 20 30, and 40 50), 
> and then suppose shard 10 20 30 is read and then document 25 is inserted - 
> then the 3..5 shard will read 30 40 50, i.e. document 30 is duplicated and 
> document 25 is lost.
> - Every shard re-executes the query and skips the first start_offset items, 
> which in total is quadratic complexity
> - The query is first executed in the constructor in order to 

[jira] [Updated] (BEAM-7476) Datastore write failures with "[Errno 32] Broken pipe"

2019-08-08 Thread Udi Meiri (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Udi Meiri updated BEAM-7476:

Fix Version/s: (was: 2.16.0)
   2.15.0

> Datastore write failures with "[Errno 32] Broken pipe"
> --
>
> Key: BEAM-7476
> URL: https://issues.apache.org/jira/browse/BEAM-7476
> Project: Beam
>  Issue Type: Bug
>  Components: io-py-gcp
>Affects Versions: 2.11.0
> Environment: dataflow python 2.7
>Reporter: Dmytro Sadovnychyi
>Assignee: Dmytro Sadovnychyi
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 2.15.0
>
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> We are getting lots of Broken pipe errors and it's only a matter of luck for 
> write to succeed. It's been happening for months.
> Partial stack trace:
> File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/gcp/datastore/v1/helper.py",
>  line 225, in commit
> response = datastore.commit(request)
>   File 
> "/usr/local/lib/python2.7/dist-packages/googledatastore/connection.py", line 
> 140, in commit
> datastore_pb2.CommitResponse)
>   File 
> "/usr/local/lib/python2.7/dist-packages/googledatastore/connection.py", line 
> 199, in _call_method
> method='POST', body=payload, headers=headers)
>   File "/usr/local/lib/python2.7/dist-packages/oauth2client/transport.py", 
> line 169, in new_request
> redirections, connection_type)
>   File "/usr/local/lib/python2.7/dist-packages/httplib2/__init__.py", line 
> 1609, in request
> (response, content) = self._request(conn, authority, uri, request_uri, 
> method, body, headers, redirections, cachekey)
>   File "/usr/local/lib/python2.7/dist-packages/httplib2/__init__.py", line 
> 1351, in _request
> (response, content) = self._conn_request(conn, request_uri, method, body, 
> headers)
>   File "/usr/local/lib/python2.7/dist-packages/httplib2/__init__.py", line 
> 1273, in _conn_request
> conn.request(method, request_uri, body, headers)
>   File "/usr/lib/python2.7/httplib.py", line 1042, in request
> self._send_request(method, url, body, headers)
>   File "/usr/lib/python2.7/httplib.py", line 1082, in _send_request
> self.endheaders(body)
>   File "/usr/lib/python2.7/httplib.py", line 1038, in endheaders
> self._send_output(message_body)
>   File "/usr/lib/python2.7/httplib.py", line 882, in _send_output
> self.send(msg)
>   File "/usr/lib/python2.7/httplib.py", line 858, in send
> self.sock.sendall(data)
>   File "/usr/lib/python2.7/ssl.py", line 753, in sendall
> v = self.send(data[count:])
>   File "/usr/lib/python2.7/ssl.py", line 719, in send
> v = self._sslobj.write(data)
> RuntimeError: error: [Errno 32] Broken pipe [while running 'Groups to 
> datastore/Write Mutation to Datastore']
> Workaround: https://github.com/apache/beam/pull/8346



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Resolved] (BEAM-7476) Datastore write failures with "[Errno 32] Broken pipe"

2019-08-08 Thread Udi Meiri (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Udi Meiri resolved BEAM-7476.
-
   Resolution: Fixed
Fix Version/s: 2.16.0

> Datastore write failures with "[Errno 32] Broken pipe"
> --
>
> Key: BEAM-7476
> URL: https://issues.apache.org/jira/browse/BEAM-7476
> Project: Beam
>  Issue Type: Bug
>  Components: io-py-gcp
>Affects Versions: 2.11.0
> Environment: dataflow python 2.7
>Reporter: Dmytro Sadovnychyi
>Assignee: Dmytro Sadovnychyi
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 2.16.0
>
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> We are getting lots of Broken pipe errors and it's only a matter of luck for 
> write to succeed. It's been happening for months.
> Partial stack trace:
> File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/gcp/datastore/v1/helper.py",
>  line 225, in commit
> response = datastore.commit(request)
>   File 
> "/usr/local/lib/python2.7/dist-packages/googledatastore/connection.py", line 
> 140, in commit
> datastore_pb2.CommitResponse)
>   File 
> "/usr/local/lib/python2.7/dist-packages/googledatastore/connection.py", line 
> 199, in _call_method
> method='POST', body=payload, headers=headers)
>   File "/usr/local/lib/python2.7/dist-packages/oauth2client/transport.py", 
> line 169, in new_request
> redirections, connection_type)
>   File "/usr/local/lib/python2.7/dist-packages/httplib2/__init__.py", line 
> 1609, in request
> (response, content) = self._request(conn, authority, uri, request_uri, 
> method, body, headers, redirections, cachekey)
>   File "/usr/local/lib/python2.7/dist-packages/httplib2/__init__.py", line 
> 1351, in _request
> (response, content) = self._conn_request(conn, request_uri, method, body, 
> headers)
>   File "/usr/local/lib/python2.7/dist-packages/httplib2/__init__.py", line 
> 1273, in _conn_request
> conn.request(method, request_uri, body, headers)
>   File "/usr/lib/python2.7/httplib.py", line 1042, in request
> self._send_request(method, url, body, headers)
>   File "/usr/lib/python2.7/httplib.py", line 1082, in _send_request
> self.endheaders(body)
>   File "/usr/lib/python2.7/httplib.py", line 1038, in endheaders
> self._send_output(message_body)
>   File "/usr/lib/python2.7/httplib.py", line 882, in _send_output
> self.send(msg)
>   File "/usr/lib/python2.7/httplib.py", line 858, in send
> self.sock.sendall(data)
>   File "/usr/lib/python2.7/ssl.py", line 753, in sendall
> v = self.send(data[count:])
>   File "/usr/lib/python2.7/ssl.py", line 719, in send
> v = self._sslobj.write(data)
> RuntimeError: error: [Errno 32] Broken pipe [while running 'Groups to 
> datastore/Write Mutation to Datastore']
> Workaround: https://github.com/apache/beam/pull/8346



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-7917) Python datastore v1new fails on retry

2019-08-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7917?focusedWorklogId=291460=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291460
 ]

ASF GitHub Bot logged work on BEAM-7917:


Author: ASF GitHub Bot
Created on: 08/Aug/19 17:44
Start Date: 08/Aug/19 17:44
Worklog Time Spent: 10m 
  Work Description: udim commented on pull request #9294: [BEAM-7917] Fix 
datastore writes failing on retry
URL: https://github.com/apache/beam/pull/9294#discussion_r312161789
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/datastore/v1new/helper.py
 ##
 @@ -100,6 +100,10 @@ def write_mutations(batch, throttler, rpc_stats_callback, 
throttle_delay=1):
 
   try:
 start_time = time.time()
+if batch._status == batch._FINISHED:
 
 Review comment:
   This change is unsafe, since it's dealing with the internal state of the 
client and the behavior may change unexpectedly in the future.
   I believe that a better choice is to create a new batch in this method every 
time.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 291460)
Time Spent: 40m  (was: 0.5h)

> Python datastore v1new fails on retry
> -
>
> Key: BEAM-7917
> URL: https://issues.apache.org/jira/browse/BEAM-7917
> Project: Beam
>  Issue Type: Bug
>  Components: io-py-gcp, runner-dataflow
>Affects Versions: 2.14.0
> Environment: Python 3.7 on Dataflow
>Reporter: Dmytro Sadovnychyi
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> Traceback (most recent call last):
>   File "apache_beam/runners/common.py", line 782, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 454, in 
> apache_beam.runners.common.SimpleInvoker.invoke_process
>   File 
> "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/datastore/v1new/datastoreio.py",
>  line 334, in process
> self._flush_batch()
>   File 
> "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/datastore/v1new/datastoreio.py",
>  line 349, in _flush_batch
> throttle_delay=util.WRITE_BATCH_TARGET_LATENCY_MS // 1000)
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/utils/retry.py", 
> line 197, in wrapper
> return fun(*args, **kwargs)
>   File 
> "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/datastore/v1new/helper.py",
>  line 99, in write_mutations
> batch.commit()
>   File 
> "/usr/local/lib/python3.7/site-packages/google/cloud/datastore/batch.py", 
> line 271, in commit
> raise ValueError("Batch must be in progress to commit()")
> ValueError: Batch must be in progress to commit()



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-7820) Add hot key detection to Dataflow Runner

2019-08-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7820?focusedWorklogId=291459=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291459
 ]

ASF GitHub Bot logged work on BEAM-7820:


Author: ASF GitHub Bot
Created on: 08/Aug/19 17:42
Start Date: 08/Aug/19 17:42
Worklog Time Spent: 10m 
  Work Description: rohdesamuel commented on pull request #9270: 
[BEAM-7820] HotKeyDetection
URL: https://github.com/apache/beam/pull/9270#discussion_r312161239
 
 

 ##
 File path: 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/HotKeyLogger.java
 ##
 @@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker;
+
+import com.google.api.client.util.Clock;
+import java.text.MessageFormat;
+import org.apache.beam.runners.dataflow.util.TimeUtil;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HotKeyLogger {
+  Logger LOG = LoggerFactory.getLogger(HotKeyLogger.class);
+
+  /** Clock used to either provide real system time or mocked to virtualize 
time for testing. */
+  private Clock clock = Clock.SYSTEM;
+
+  /**
+   * The previous time the HotKeyDetection was logged. This is used to 
throttle logging to every 5
+   * minutes.
+   */
+  private long prevHotKeyDetectionLogMs = 0;
+
+  /** Throttles logging the detection to every loggingPeriod */
+  private final Duration loggingPeriod = Duration.standardMinutes(5);
+
+  HotKeyLogger() {}
+
+  HotKeyLogger(Clock clock) {
+this.clock = clock;
+  }
+
+  /** Logs a detection of the hot key every 5 minutes. */
+  public void logHotKeyDetection(String userStepName, Duration hotKeyAge) {
+if (isThrottled()) {
+  return;
+}
+LOG.warn(getHotKeyMessage(userStepName, 
TimeUtil.toCloudDuration(hotKeyAge)));
 
 Review comment:
   Yes, the detection from the service is explicitly only when the hot key is 
causing slowness/ limited parallelism.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 291459)
Time Spent: 3h 20m  (was: 3h 10m)

> Add hot key detection to Dataflow Runner
> 
>
> Key: BEAM-7820
> URL: https://issues.apache.org/jira/browse/BEAM-7820
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-dataflow
>Reporter: Sam Rohde
>Assignee: Sam Rohde
>Priority: Minor
>  Time Spent: 3h 20m
>  Remaining Estimate: 0h
>
> This tracks adding hot key detection in the Dataflow Runner. 
> There are times when a user's pipeline spuriously slows down due to hot keys. 
> During these times, users are unable to see under the hood at what the 
> pipeline is doing. This adds hot key detection to show the user when their 
> pipeline has a hot key.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Work logged] (BEAM-7917) Python datastore v1new fails on retry

2019-08-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7917?focusedWorklogId=291457=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291457
 ]

ASF GitHub Bot logged work on BEAM-7917:


Author: ASF GitHub Bot
Created on: 08/Aug/19 17:42
Start Date: 08/Aug/19 17:42
Worklog Time Spent: 10m 
  Work Description: tvalentyn commented on issue #9294: [BEAM-7917] Fix 
datastore writes failing on retry
URL: https://github.com/apache/beam/pull/9294#issuecomment-519618790
 
 
   R: @udim who is more familiar with this IO.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 291457)
Time Spent: 0.5h  (was: 20m)

> Python datastore v1new fails on retry
> -
>
> Key: BEAM-7917
> URL: https://issues.apache.org/jira/browse/BEAM-7917
> Project: Beam
>  Issue Type: Bug
>  Components: io-py-gcp, runner-dataflow
>Affects Versions: 2.14.0
> Environment: Python 3.7 on Dataflow
>Reporter: Dmytro Sadovnychyi
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> Traceback (most recent call last):
>   File "apache_beam/runners/common.py", line 782, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 454, in 
> apache_beam.runners.common.SimpleInvoker.invoke_process
>   File 
> "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/datastore/v1new/datastoreio.py",
>  line 334, in process
> self._flush_batch()
>   File 
> "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/datastore/v1new/datastoreio.py",
>  line 349, in _flush_batch
> throttle_delay=util.WRITE_BATCH_TARGET_LATENCY_MS // 1000)
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/utils/retry.py", 
> line 197, in wrapper
> return fun(*args, **kwargs)
>   File 
> "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/datastore/v1new/helper.py",
>  line 99, in write_mutations
> batch.commit()
>   File 
> "/usr/local/lib/python3.7/site-packages/google/cloud/datastore/batch.py", 
> line 271, in commit
> raise ValueError("Batch must be in progress to commit()")
> ValueError: Batch must be in progress to commit()



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


  1   2   >