[jira] [Work logged] (BEAM-7854) Reading files from local file system does not fully support glob
[ https://issues.apache.org/jira/browse/BEAM-7854?focusedWorklogId=288486=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-288486 ] ASF GitHub Bot logged work on BEAM-7854: Author: ASF GitHub Bot Created on: 03/Aug/19 23:09 Start Date: 03/Aug/19 23:09 Worklog Time Spent: 10m Work Description: tomerr90 commented on issue #9197: [BEAM-7854] Resolve parent folder recursively in LocalFileSystem matc… URL: https://github.com/apache/beam/pull/9197#issuecomment-517960745 The failed tests get a timeout because of the issue I mentioned. I can modify the implementation to only search for a parent recursively if the string contains glob wild cards, but it feels a bit fragile, if the glob spec will be updated with new wildcards, would appreciate your thoughts on the issue. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 288486) Time Spent: 1h (was: 50m) > Reading files from local file system does not fully support glob > > > Key: BEAM-7854 > URL: https://issues.apache.org/jira/browse/BEAM-7854 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Reporter: Tomer Zeltzer >Priority: Major > Time Spent: 1h > Remaining Estimate: 0h > > Folder structure: > {code:java} > A > B > a=100 > data1 > file1.zst > file2.zst > a=999 > data2 > file6.zst > a=397 > data3 > file7.zst{code} > > Glob: > > {code:java} > /A/B/a=[0-9][0-9][0-9]/*/*{code} > Code: > > {code:java} > input.apply(Create.of(patterns)) > .apply("Matching patterns", FileIO.matchAll()) > .apply(FileIO.readMatches()); > {code} > > input is of type PBegin. > The above code matches 0 files even though, from the glob, its clear it > should match all files. I suspect its because of line 227, where only the > first parent folder is checked while is could be an asterix in a glob. I > believe the right behaviour should be to check all parent folder and use the > first one that exists. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-7886) Make row coder a standard coder and implement in python
[ https://issues.apache.org/jira/browse/BEAM-7886?focusedWorklogId=288476=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-288476 ] ASF GitHub Bot logged work on BEAM-7886: Author: ASF GitHub Bot Created on: 03/Aug/19 21:52 Start Date: 03/Aug/19 21:52 Worklog Time Spent: 10m Work Description: chadrik commented on pull request #9188: [BEAM-7886] Make row coder a standard coder and implement in Python URL: https://github.com/apache/beam/pull/9188#discussion_r310363883 ## File path: sdks/python/apache_beam/typehints/schemas.py ## @@ -0,0 +1,163 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import absolute_import + +from typing import List +from typing import Mapping +from typing import NamedTuple +from typing import Optional +from uuid import uuid4 + +import numpy as np + +from apache_beam.portability.api import schema_pb2 +from apache_beam.typehints.native_type_compatibility import _get_args +from apache_beam.typehints.native_type_compatibility import _match_is_exactly_mapping +from apache_beam.typehints.native_type_compatibility import _match_is_named_tuple +from apache_beam.typehints.native_type_compatibility import _match_is_optional +from apache_beam.typehints.native_type_compatibility import _safe_issubclass +from apache_beam.typehints.native_type_compatibility import extract_optional_type + + +# Registry of typings for a schema by UUID +class SchemaTypeRegistry(object): + def __init__(self): +self.by_id = {} +self.by_typing = {} + + def add(self, typing, schema): +self.by_id[schema.id] = (typing, schema) + + def get_typing_by_id(self, unique_id): +result = self.by_id.get(unique_id, None) +return result[0] if result is not None else None + + def get_schema_by_id(self, unique_id): +result = self.by_id.get(unique_id, None) +return result[1] if result is not None else None + + +SCHEMA_REGISTRY = SchemaTypeRegistry() + + +_PRIMITIVES = ( +(np.int8, schema_pb2.AtomicType.BYTE), +(np.int16, schema_pb2.AtomicType.INT16), +(np.int32, schema_pb2.AtomicType.INT32), +(np.int64, schema_pb2.AtomicType.INT64), +(np.float32, schema_pb2.AtomicType.FLOAT), +(np.float64, schema_pb2.AtomicType.DOUBLE), +(np.unicode, schema_pb2.AtomicType.STRING), +(np.bool, schema_pb2.AtomicType.BOOLEAN), +(np.bytes_, schema_pb2.AtomicType.BYTES), Review comment: why not use builtin `bytes` here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 288476) Time Spent: 40m (was: 0.5h) > Make row coder a standard coder and implement in python > --- > > Key: BEAM-7886 > URL: https://issues.apache.org/jira/browse/BEAM-7886 > Project: Beam > Issue Type: Improvement > Components: beam-model, sdk-java-core, sdk-py-core >Reporter: Brian Hulette >Assignee: Brian Hulette >Priority: Major > Time Spent: 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-7886) Make row coder a standard coder and implement in python
[ https://issues.apache.org/jira/browse/BEAM-7886?focusedWorklogId=288475=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-288475 ] ASF GitHub Bot logged work on BEAM-7886: Author: ASF GitHub Bot Created on: 03/Aug/19 21:49 Start Date: 03/Aug/19 21:49 Worklog Time Spent: 10m Work Description: chadrik commented on pull request #9188: [BEAM-7886] Make row coder a standard coder and implement in Python URL: https://github.com/apache/beam/pull/9188#discussion_r310363813 ## File path: sdks/python/apache_beam/typehints/schemas.py ## @@ -0,0 +1,163 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import absolute_import + +from typing import List +from typing import Mapping +from typing import NamedTuple +from typing import Optional +from uuid import uuid4 + +import numpy as np + +from apache_beam.portability.api import schema_pb2 +from apache_beam.typehints.native_type_compatibility import _get_args +from apache_beam.typehints.native_type_compatibility import _match_is_exactly_mapping +from apache_beam.typehints.native_type_compatibility import _match_is_named_tuple +from apache_beam.typehints.native_type_compatibility import _match_is_optional +from apache_beam.typehints.native_type_compatibility import _safe_issubclass +from apache_beam.typehints.native_type_compatibility import extract_optional_type + + +# Registry of typings for a schema by UUID +class SchemaTypeRegistry(object): + def __init__(self): +self.by_id = {} +self.by_typing = {} + + def add(self, typing, schema): +self.by_id[schema.id] = (typing, schema) + + def get_typing_by_id(self, unique_id): +result = self.by_id.get(unique_id, None) +return result[0] if result is not None else None + + def get_schema_by_id(self, unique_id): +result = self.by_id.get(unique_id, None) +return result[1] if result is not None else None + + +SCHEMA_REGISTRY = SchemaTypeRegistry() + + +_PRIMITIVES = ( +(np.int8, schema_pb2.AtomicType.BYTE), +(np.int16, schema_pb2.AtomicType.INT16), +(np.int32, schema_pb2.AtomicType.INT32), +(np.int64, schema_pb2.AtomicType.INT64), +(np.float32, schema_pb2.AtomicType.FLOAT), +(np.float64, schema_pb2.AtomicType.DOUBLE), +(np.unicode, schema_pb2.AtomicType.STRING), +(np.bool, schema_pb2.AtomicType.BOOLEAN), Review comment: likewise, `np.bool` is just the builtin `bool`, so this is needlessly obfuscated. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 288475) Time Spent: 0.5h (was: 20m) > Make row coder a standard coder and implement in python > --- > > Key: BEAM-7886 > URL: https://issues.apache.org/jira/browse/BEAM-7886 > Project: Beam > Issue Type: Improvement > Components: beam-model, sdk-java-core, sdk-py-core >Reporter: Brian Hulette >Assignee: Brian Hulette >Priority: Major > Time Spent: 0.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-7886) Make row coder a standard coder and implement in python
[ https://issues.apache.org/jira/browse/BEAM-7886?focusedWorklogId=288473=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-288473 ] ASF GitHub Bot logged work on BEAM-7886: Author: ASF GitHub Bot Created on: 03/Aug/19 21:31 Start Date: 03/Aug/19 21:31 Worklog Time Spent: 10m Work Description: chadrik commented on pull request #9188: [BEAM-7886] Make row coder a standard coder and implement in Python URL: https://github.com/apache/beam/pull/9188#discussion_r310363454 ## File path: sdks/python/apache_beam/typehints/schemas.py ## @@ -0,0 +1,163 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import absolute_import + +from typing import List +from typing import Mapping +from typing import NamedTuple +from typing import Optional +from uuid import uuid4 + +import numpy as np + +from apache_beam.portability.api import schema_pb2 +from apache_beam.typehints.native_type_compatibility import _get_args +from apache_beam.typehints.native_type_compatibility import _match_is_exactly_mapping +from apache_beam.typehints.native_type_compatibility import _match_is_named_tuple +from apache_beam.typehints.native_type_compatibility import _match_is_optional +from apache_beam.typehints.native_type_compatibility import _safe_issubclass +from apache_beam.typehints.native_type_compatibility import extract_optional_type + + +# Registry of typings for a schema by UUID +class SchemaTypeRegistry(object): + def __init__(self): +self.by_id = {} +self.by_typing = {} + + def add(self, typing, schema): +self.by_id[schema.id] = (typing, schema) + + def get_typing_by_id(self, unique_id): +result = self.by_id.get(unique_id, None) +return result[0] if result is not None else None + + def get_schema_by_id(self, unique_id): +result = self.by_id.get(unique_id, None) +return result[1] if result is not None else None + + +SCHEMA_REGISTRY = SchemaTypeRegistry() + + +_PRIMITIVES = ( +(np.int8, schema_pb2.AtomicType.BYTE), +(np.int16, schema_pb2.AtomicType.INT16), +(np.int32, schema_pb2.AtomicType.INT32), +(np.int64, schema_pb2.AtomicType.INT64), +(np.float32, schema_pb2.AtomicType.FLOAT), +(np.float64, schema_pb2.AtomicType.DOUBLE), +(np.unicode, schema_pb2.AtomicType.STRING), Review comment: same comment about `np.unicode` here. Since numpy is not used extensively in beam, it's unclear that this behaves the same as `past.builtins.unicode`, which is what the beam code uses. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 288473) Time Spent: 20m (was: 10m) > Make row coder a standard coder and implement in python > --- > > Key: BEAM-7886 > URL: https://issues.apache.org/jira/browse/BEAM-7886 > Project: Beam > Issue Type: Improvement > Components: beam-model, sdk-java-core, sdk-py-core >Reporter: Brian Hulette >Assignee: Brian Hulette >Priority: Major > Time Spent: 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-7886) Make row coder a standard coder and implement in python
[ https://issues.apache.org/jira/browse/BEAM-7886?focusedWorklogId=288471=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-288471 ] ASF GitHub Bot logged work on BEAM-7886: Author: ASF GitHub Bot Created on: 03/Aug/19 21:29 Start Date: 03/Aug/19 21:29 Worklog Time Spent: 10m Work Description: chadrik commented on pull request #9188: [BEAM-7886] Make row coder a standard coder and implement in Python URL: https://github.com/apache/beam/pull/9188#discussion_r310363425 ## File path: sdks/python/apache_beam/coders/row_coder_test.py ## @@ -0,0 +1,89 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from __future__ import absolute_import + +import logging +import typing +import unittest + +import numpy as np + +from apache_beam.coders import RowCoder +from apache_beam.coders.typecoders import registry as coders_registry +from apache_beam.portability.api import schema_pb2 +from apache_beam.typehints.schemas import typing_to_runner_api + +Person = typing.NamedTuple("Person", [ +("name", np.unicode), +("age", np.int32), +("address", typing.Optional[np.unicode]), +("aliases", typing.List[np.unicode]), Review comment: It's a bit confusing to use `np.unicode` here. To stay consistent with the rest of the 2-to-3 efforts in beam, I think it'd be preferable to use `past.builtins.unicode` here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 288471) Time Spent: 10m Remaining Estimate: 0h > Make row coder a standard coder and implement in python > --- > > Key: BEAM-7886 > URL: https://issues.apache.org/jira/browse/BEAM-7886 > Project: Beam > Issue Type: Improvement > Components: beam-model, sdk-java-core, sdk-py-core >Reporter: Brian Hulette >Assignee: Brian Hulette >Priority: Major > Time Spent: 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (BEAM-7825) Python's DirectRunner emits multiple panes per window and does not discard late data
[ https://issues.apache.org/jira/browse/BEAM-7825?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Strokach updated BEAM-7825: -- Description: The documentation for Beam's Windowing and Triggers functionality [states that|https://beam.apache.org/documentation/programming-guide/#triggers] _"if you use Beam’s default windowing configuration and default trigger, Beam outputs the aggregated result when it estimates all data has arrived, and discards all subsequent data for that window"_. However, it seems that the current behavior of Python's DirectRunner is inconsistent with both of those points. As the {{StreamingWordGroupIT.test_discard_late_data}} test shows, DirectRunner appears to process every data point that it reads from the input stream, irrespective of whether or not the timestamp of that data point is older than the timestamps of the windows that have already been processed. Furthermore, as the {{StreamingWordGroupIT.test_single_output_per_window}} test shows, DirectRunner generates multiple "panes" for the same window, apparently disregarding the notion of a watermark? The Dataflow runner passes both of those end-to-end tests. was: The documentation for Beam's Windowing and Triggers functionality [states that|https://beam.apache.org/documentation/programming-guide/#triggers] _"if you use Beam’s default windowing configuration and default trigger, Beam outputs the aggregated result when it estimates all data has arrived, and discards all subsequent data for that window"_. However, it seems that the current behavior of Python's DirectRunner is inconsistent with both of those points. As the {{StreamingWordGroupIT.test_discard_late_data}} test shows, DirectRunner appears to process every data point that it reads from the input stream, irrespective of whether or not the timestamp of that data point is older than the timestamps of the windows that have already been processed. Furthermore, as the {{StreamingWordGroupIT.test_single_output_per_window}} test shows, DirectRunner generates multiple "panes" for the same window, apparently disregarding the notion of a watermark? The Dataflow runner passes both of those end-to-end tests. Until the limitations of DirectRunner are addressed, maybe they should be listed on the [DirectRunner documentation page](https://beam.apache.org/documentation/runners/direct/)? > Python's DirectRunner emits multiple panes per window and does not discard > late data > > > Key: BEAM-7825 > URL: https://issues.apache.org/jira/browse/BEAM-7825 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Affects Versions: 2.13.0 > Environment: OS: Debian rodete. > Beam versions: 2.15.0.dev. > Python versions: Python 2.7, Python 3.7 >Reporter: Alexey Strokach >Priority: Major > Time Spent: 3.5h > Remaining Estimate: 0h > > The documentation for Beam's Windowing and Triggers functionality [states > that|https://beam.apache.org/documentation/programming-guide/#triggers] _"if > you use Beam’s default windowing configuration and default trigger, Beam > outputs the aggregated result when it estimates all data has arrived, and > discards all subsequent data for that window"_. However, it seems that the > current behavior of Python's DirectRunner is inconsistent with both of those > points. As the {{StreamingWordGroupIT.test_discard_late_data}} test shows, > DirectRunner appears to process every data point that it reads from the input > stream, irrespective of whether or not the timestamp of that data point is > older than the timestamps of the windows that have already been processed. > Furthermore, as the {{StreamingWordGroupIT.test_single_output_per_window}} > test shows, DirectRunner generates multiple "panes" for the same window, > apparently disregarding the notion of a watermark? > The Dataflow runner passes both of those end-to-end tests. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (BEAM-7825) Python's DirectRunner emits multiple panes per window and does not discard late data
[ https://issues.apache.org/jira/browse/BEAM-7825?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Strokach updated BEAM-7825: -- Description: The documentation for Beam's Windowing and Triggers functionality [states that|https://beam.apache.org/documentation/programming-guide/#triggers] _"if you use Beam’s default windowing configuration and default trigger, Beam outputs the aggregated result when it estimates all data has arrived, and discards all subsequent data for that window"_. However, it seems that the current behavior of Python's DirectRunner is inconsistent with both of those points. As the {{StreamingWordGroupIT.test_discard_late_data}} test shows, DirectRunner appears to process every data point that it reads from the input stream, irrespective of whether or not the timestamp of that data point is older than the timestamps of the windows that have already been processed. Furthermore, as the {{StreamingWordGroupIT.test_single_output_per_window}} test shows, DirectRunner generates multiple "panes" for the same window, apparently disregarding the notion of a watermark? The Dataflow runner passes both of those end-to-end tests. Until the limitations of DirectRunner are addressed, maybe they should be listed on the [DirectRunner documentation page](https://beam.apache.org/documentation/runners/direct/)? was: The documentation for Beam's Windowing and Triggers functionality [states that|https://beam.apache.org/documentation/programming-guide/#triggers] _"if you use Beam’s default windowing configuration and default trigger, Beam outputs the aggregated result when it estimates all data has arrived, and discards all subsequent data for that window"_. However, it seems that the current behavior of Python's DirectRunner is inconsistent with both of those points. As the {{StreamingWordGroupIT.test_discard_late_data}} test shows, DirectRunner appears to process every data point that it reads from the input stream, irrespective of whether or not the timestamp of that data point is older than the timestamps of the windows that have already been processed. Furthermore, as the {{StreamingWordGroupIT.test_single_output_per_window}} test shows, DirectRunner generates multiple "panes" for the same window, apparently disregarding the notion of a watermark? The Dataflow runner passes both of those end-to-end tests. Until the limitations of DirectRunner are addressed, maybe they should be listed on the [DirectRunner documentation page](https://beam.apache.org/documentation/runners/direct/)? > Python's DirectRunner emits multiple panes per window and does not discard > late data > > > Key: BEAM-7825 > URL: https://issues.apache.org/jira/browse/BEAM-7825 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Affects Versions: 2.13.0 > Environment: OS: Debian rodete. > Beam versions: 2.15.0.dev. > Python versions: Python 2.7, Python 3.7 >Reporter: Alexey Strokach >Priority: Major > Time Spent: 3.5h > Remaining Estimate: 0h > > The documentation for Beam's Windowing and Triggers functionality [states > that|https://beam.apache.org/documentation/programming-guide/#triggers] _"if > you use Beam’s default windowing configuration and default trigger, Beam > outputs the aggregated result when it estimates all data has arrived, and > discards all subsequent data for that window"_. However, it seems that the > current behavior of Python's DirectRunner is inconsistent with both of those > points. As the {{StreamingWordGroupIT.test_discard_late_data}} test shows, > DirectRunner appears to process every data point that it reads from the input > stream, irrespective of whether or not the timestamp of that data point is > older than the timestamps of the windows that have already been processed. > Furthermore, as the {{StreamingWordGroupIT.test_single_output_per_window}} > test shows, DirectRunner generates multiple "panes" for the same window, > apparently disregarding the notion of a watermark? > The Dataflow runner passes both of those end-to-end tests. > Until the limitations of DirectRunner are addressed, maybe they should be > listed on the [DirectRunner documentation > page](https://beam.apache.org/documentation/runners/direct/)? -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (BEAM-7825) Python's DirectRunner emits multiple panes per window and does not discard late data
[ https://issues.apache.org/jira/browse/BEAM-7825?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Strokach updated BEAM-7825: -- Description: The documentation for Beam's Windowing and Triggers functionality [states that|https://beam.apache.org/documentation/programming-guide/#triggers] _"if you use Beam’s default windowing configuration and default trigger, Beam outputs the aggregated result when it estimates all data has arrived, and discards all subsequent data for that window"_. However, it seems that the current behavior of Python's DirectRunner is inconsistent with both of those points. As the {{StreamingWordGroupIT.test_discard_late_data}} test shows, DirectRunner appears to process every data point that it reads from the input stream, irrespective of whether or not the timestamp of that data point is older than the timestamps of the windows that have already been processed. Furthermore, as the {{StreamingWordGroupIT.test_single_output_per_window}} test shows, DirectRunner generates multiple "panes" for the same window, apparently disregarding the notion of a watermark? The Dataflow runner passes both of those end-to-end tests. Until the limitations of DirectRunner are addressed, maybe they should be listed on the [DirectRunner documentation page](https://beam.apache.org/documentation/runners/direct/)? was: The documentation for Beam's Windowing and Triggers functionality [states that|https://beam.apache.org/documentation/programming-guide/#triggers] _"if you use Beam’s default windowing configuration and default trigger, Beam outputs the aggregated result when it estimates all data has arrived, and discards all subsequent data for that window"_. However, it seems that the current behavior of Python's DirectRunner is inconsistent with both of those points. In my experience, DirectRunner will process every data point that it reads from the input stream, irrespective of whether or not the timestamp of that data point is older than the timestamps of the windows that have already been processed. Furthermore, it regularly generates multiple "panes" for the same window, apparently disregarding the notion of a watermark? An integration test demonstrating the inconsistencies between DirectRunner and Dataflow is provided in the linked PR. Until the limitations of DirectRunner are addressed, maybe they should be listed on the [DirectRunner documentation page](https://beam.apache.org/documentation/runners/direct/)? As far as I understand (I have not seen this explicitly documented anywhere), in the case of Cloud Dataflow, the pipeline will first process all elements that have accumulated in a PubSub subscription before the start of the pipeline, and will then process all new elements which have a timestamp within a certain narrow range of the current time (UTC). Would this be the behavior that DirectRunner should be trying to emulate? While Dataflow does emit a single pane per window (by default) and discards late data, it may be too eager in what data it calls "late"? > Python's DirectRunner emits multiple panes per window and does not discard > late data > > > Key: BEAM-7825 > URL: https://issues.apache.org/jira/browse/BEAM-7825 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Affects Versions: 2.13.0 > Environment: OS: Debian rodete. > Beam versions: 2.15.0.dev. > Python versions: Python 2.7, Python 3.7 >Reporter: Alexey Strokach >Priority: Major > Time Spent: 3.5h > Remaining Estimate: 0h > > The documentation for Beam's Windowing and Triggers functionality [states > that|https://beam.apache.org/documentation/programming-guide/#triggers] _"if > you use Beam’s default windowing configuration and default trigger, Beam > outputs the aggregated result when it estimates all data has arrived, and > discards all subsequent data for that window"_. However, it seems that the > current behavior of Python's DirectRunner is inconsistent with both of those > points. As the {{StreamingWordGroupIT.test_discard_late_data}} test shows, > DirectRunner appears to process every data point that it reads from the input > stream, irrespective of whether or not the timestamp of that data point is > older than the timestamps of the windows that have already been processed. > Furthermore, as the {{StreamingWordGroupIT.test_single_output_per_window}} > test shows, DirectRunner generates multiple "panes" for the same window, > apparently disregarding the notion of a watermark? The Dataflow runner passes > both of those end-to-end tests. > Until the limitations of DirectRunner are addressed, maybe they should be > listed on the [DirectRunner documentation >
[jira] [Work logged] (BEAM-7819) PubsubMessage message parsing is lacking non-attribute fields
[ https://issues.apache.org/jira/browse/BEAM-7819?focusedWorklogId=288446=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-288446 ] ASF GitHub Bot logged work on BEAM-7819: Author: ASF GitHub Bot Created on: 03/Aug/19 15:02 Start Date: 03/Aug/19 15:02 Worklog Time Spent: 10m Work Description: matt-darwin commented on pull request #9232: [BEAM-7819] Python - parse PubSub message_id into attributes property URL: https://github.com/apache/beam/pull/9232#discussion_r310354194 ## File path: sdks/python/apache_beam/io/gcp/pubsub.py ## @@ -127,6 +127,10 @@ def _from_message(msg): """ # Convert ScalarMapContainer to dict. attributes = dict((key, msg.attributes[key]) for key in msg.attributes) +# Parse the PubSub message_id and add to attributes +if msg.message_id != None: Review comment: Bit odd, the message_id is returned, but the value currently always seems to be blank, I've tried using the id_label parameter as well to specify message_id, but this still returns nothing. Will continue to investigate. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 288446) Time Spent: 50m (was: 40m) > PubsubMessage message parsing is lacking non-attribute fields > - > > Key: BEAM-7819 > URL: https://issues.apache.org/jira/browse/BEAM-7819 > Project: Beam > Issue Type: Bug > Components: io-python-gcp >Reporter: Ahmet Altay >Assignee: Udi Meiri >Priority: Major > Time Spent: 50m > Remaining Estimate: 0h > > User reported issue: > https://lists.apache.org/thread.html/139b0c15abc6471a2e2202d76d915c645a529a23ecc32cd9cfecd315@%3Cuser.beam.apache.org%3E > """ > Looking at the source code, with my untrained python eyes, I think if the > intention is to include the message id and the publish time in the attributes > attribute of the PubSubMessage type, then the protobuf mapping is missing > something:- > @staticmethod > def _from_proto_str(proto_msg): > """Construct from serialized form of ``PubsubMessage``. > Args: > proto_msg: String containing a serialized protobuf of type > https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.PubsubMessage > Returns: > A new PubsubMessage object. > """ > msg = pubsub.types.pubsub_pb2.PubsubMessage() > msg.ParseFromString(proto_msg) > # Convert ScalarMapContainer to dict. > attributes = dict((key, msg.attributes[key]) for key in msg.attributes) > return PubsubMessage(msg.data, attributes) > The protobuf definition is here:- > https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.PubsubMessage > and so it looks as if the message_id and publish_time are not being parsed as > they are seperate from the attributes. Perhaps the PubsubMessage class needs > expanding to include these as attributes, or they would need adding to the > dictionary for attributes. This would only need doing for the _from_proto_str > as obviously they would not need to be populated when transmitting a message > to PubSub. > My python is not great, I'm assuming the latter option would need to look > something like this? > attributes = dict((key, msg.attributes[key]) for key in msg.attributes) > attributes.update({'message_id': msg.message_id, 'publish_time': > msg.publish_time}) > return PubsubMessage(msg.data, attributes) > """ -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (BEAM-7836) Provide a way to exclude unit / integration tests with incompatible syntax from running under particular version of Python.
[ https://issues.apache.org/jira/browse/BEAM-7836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16899464#comment-16899464 ] yoshiki obata commented on BEAM-7836: - BEAM-3713 needs to be done, using pytest and pytest_ignore_collect should be useful. Google App Engine tests of sample codes are skipped in Python 3 as follows: https://github.com/GoogleCloudPlatform/python-docs-samples/blob/master/appengine/standard/conftest.py > Provide a way to exclude unit / integration tests with incompatible syntax > from running under particular version of Python. > --- > > Key: BEAM-7836 > URL: https://issues.apache.org/jira/browse/BEAM-7836 > Project: Beam > Issue Type: Bug > Components: sdk-py-core, testing >Reporter: Valentyn Tymofieiev >Priority: Major > > Beam currently runs Python tests in 4 version of Python: 2.7, 3.5, 3.6, 3.7. > Some Python 3 tests may include code that is considered incorrect syntax in > older versions of Python, and such test may break test suites that they are > not intended for, see an example: [1] > We use `exec` as work around in (very few) tests , example: [2], but it is > less convenient to work with such code, for example it is harder to debug or > work with in IDE. > We should find best practices to deal with this problem and introduce a > solution to the Beam . > cc: [~markflyhigh], [~udim], [~yoshiki.obata] > [1] https://github.com/apache/beam/pull/8505#issuecomment-498441270 > [2] > https://github.com/apache/beam/blob/6cf3b1133658e963b6cc23f780480c3359e79ad6/sdks/python/apache_beam/internal/pickler_test.py#L103 > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-7819) PubsubMessage message parsing is lacking non-attribute fields
[ https://issues.apache.org/jira/browse/BEAM-7819?focusedWorklogId=288440=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-288440 ] ASF GitHub Bot logged work on BEAM-7819: Author: ASF GitHub Bot Created on: 03/Aug/19 13:07 Start Date: 03/Aug/19 13:07 Worklog Time Spent: 10m Work Description: matt-darwin commented on pull request #9232: [BEAM-7819] Python - parse PubSub message_id into attributes property URL: https://github.com/apache/beam/pull/9232#discussion_r310351040 ## File path: sdks/python/apache_beam/io/gcp/pubsub.py ## @@ -127,6 +127,10 @@ def _from_message(msg): """ # Convert ScalarMapContainer to dict. attributes = dict((key, msg.attributes[key]) for key in msg.attributes) +# Parse the PubSub message_id and add to attributes +if msg.message_id != None: Review comment: It shouldn't be, was simply being defensive. I've missed something off here however, and this is only working for the directrunner at the moment as I hadn't amended the _from_protobuf function. The parsing of the inbound object is slightly different there, so I'm just checking whether this can be done in this manner now, and will amend (and remove the unnecessary check). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 288440) Time Spent: 40m (was: 0.5h) > PubsubMessage message parsing is lacking non-attribute fields > - > > Key: BEAM-7819 > URL: https://issues.apache.org/jira/browse/BEAM-7819 > Project: Beam > Issue Type: Bug > Components: io-python-gcp >Reporter: Ahmet Altay >Assignee: Udi Meiri >Priority: Major > Time Spent: 40m > Remaining Estimate: 0h > > User reported issue: > https://lists.apache.org/thread.html/139b0c15abc6471a2e2202d76d915c645a529a23ecc32cd9cfecd315@%3Cuser.beam.apache.org%3E > """ > Looking at the source code, with my untrained python eyes, I think if the > intention is to include the message id and the publish time in the attributes > attribute of the PubSubMessage type, then the protobuf mapping is missing > something:- > @staticmethod > def _from_proto_str(proto_msg): > """Construct from serialized form of ``PubsubMessage``. > Args: > proto_msg: String containing a serialized protobuf of type > https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.PubsubMessage > Returns: > A new PubsubMessage object. > """ > msg = pubsub.types.pubsub_pb2.PubsubMessage() > msg.ParseFromString(proto_msg) > # Convert ScalarMapContainer to dict. > attributes = dict((key, msg.attributes[key]) for key in msg.attributes) > return PubsubMessage(msg.data, attributes) > The protobuf definition is here:- > https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.PubsubMessage > and so it looks as if the message_id and publish_time are not being parsed as > they are seperate from the attributes. Perhaps the PubsubMessage class needs > expanding to include these as attributes, or they would need adding to the > dictionary for attributes. This would only need doing for the _from_proto_str > as obviously they would not need to be populated when transmitting a message > to PubSub. > My python is not great, I'm assuming the latter option would need to look > something like this? > attributes = dict((key, msg.attributes[key]) for key in msg.attributes) > attributes.update({'message_id': msg.message_id, 'publish_time': > msg.publish_time}) > return PubsubMessage(msg.data, attributes) > """ -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-1893) Add CouchbaseIO
[ https://issues.apache.org/jira/browse/BEAM-1893?focusedWorklogId=288438=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-288438 ] ASF GitHub Bot logged work on BEAM-1893: Author: ASF GitHub Bot Created on: 03/Aug/19 12:23 Start Date: 03/Aug/19 12:23 Worklog Time Spent: 10m Work Description: stale[bot] commented on issue #8152: [DoNotMerge][BEAM-1893] Implementation of CouchbaseIO URL: https://github.com/apache/beam/pull/8152#issuecomment-517920717 This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the d...@beam.apache.org list. Thank you for your contributions. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 288438) Time Spent: 4h 10m (was: 4h) > Add CouchbaseIO > --- > > Key: BEAM-1893 > URL: https://issues.apache.org/jira/browse/BEAM-1893 > Project: Beam > Issue Type: New Feature > Components: io-ideas >Reporter: Xu Mingmin >Assignee: LI Guobao >Priority: Major > Labels: Couchbase, IO, features > Time Spent: 4h 10m > Remaining Estimate: 0h > > Create a {{CouchbaseIO}} for Couchbase database. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-6829) Duplicate metric warnings clutter log
[ https://issues.apache.org/jira/browse/BEAM-6829?focusedWorklogId=288428=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-288428 ] ASF GitHub Bot logged work on BEAM-6829: Author: ASF GitHub Bot Created on: 03/Aug/19 10:23 Start Date: 03/Aug/19 10:23 Worklog Time Spent: 10m Work Description: stale[bot] commented on issue #8585: [BEAM-6829] Use transform name for metric namespace if none provided URL: https://github.com/apache/beam/pull/8585#issuecomment-517914070 This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the d...@beam.apache.org list. Thank you for your contributions. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 288428) Time Spent: 1h 20m (was: 1h 10m) > Duplicate metric warnings clutter log > - > > Key: BEAM-6829 > URL: https://issues.apache.org/jira/browse/BEAM-6829 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 2.11.0 >Reporter: Thomas Weise >Assignee: Maximilian Michels >Priority: Major > Labels: portability > Time Spent: 1h 20m > Remaining Estimate: 0h > > Logs fill up quickly with these warnings: > {code:java} > WARN org.apache.flink.metrics.MetricGroup - Name collision: Group already > contains a Metric with the name ...{code} -- This message was sent by Atlassian JIRA (v7.6.14#76016)