[jira] [Commented] (BEAM-5931) Rollback PR/6899
[ https://issues.apache.org/jira/browse/BEAM-5931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16676260#comment-16676260 ] Ruoyun Huang commented on BEAM-5931: + [~chamikara] , [~szewinho], [~kasiak], [~dariusz.aniszewski] Added area experts. (according to this page: [https://cwiki.apache.org/confluence/display/BEAM/Works+in+Progress]) Folks, would like to ask for suggestions regarding which way would be the easiest/best to fix PerformanceTests_TextIOIT (more details on my original questions [https://bit.ly/2qui6Ot]). Thanks a lot everyone. > Rollback PR/6899 > > > Key: BEAM-5931 > URL: https://issues.apache.org/jira/browse/BEAM-5931 > Project: Beam > Issue Type: Task > Components: beam-model, runner-dataflow >Reporter: Luke Cwik >Assignee: Ruoyun Huang >Priority: Major > Time Spent: 2h > Remaining Estimate: 0h > > To rollback this change, one must either: > 1) Update nexmark / perf test framework to use Dataflow worker jar. > This requires adding the > {code} > "--dataflowWorkerJar=${dataflowWorkerJar}", > "--workerHarnessContainerImage=", > {code} > when running the tests. > OR > 2) Update the dataflow worker image with code that contains the rollback of > PR/6899 and then rollback PR/6899 in Github with the updated Dataflow worker > image. > #1 is preferable since we will no longer have tests running that don't use a > Dataflow worker jar built from Github HEAD. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-5979) Support DATE and TIME
Rui Wang created BEAM-5979: -- Summary: Support DATE and TIME Key: BEAM-5979 URL: https://issues.apache.org/jira/browse/BEAM-5979 Project: Beam Issue Type: Bug Components: dsl-sql Reporter: Rui Wang Assignee: Rui Wang Right now, BeamSQL uses Schema's DATETIME field to save all time related data. However, BeamSQL doesn't implement correctly how TIME and DATE should be converted to Joda's datetime. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work started] (BEAM-5979) Support DATE and TIME
[ https://issues.apache.org/jira/browse/BEAM-5979?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on BEAM-5979 started by Rui Wang. -- > Support DATE and TIME > - > > Key: BEAM-5979 > URL: https://issues.apache.org/jira/browse/BEAM-5979 > Project: Beam > Issue Type: Bug > Components: dsl-sql >Reporter: Rui Wang >Assignee: Rui Wang >Priority: Major > > Right now, BeamSQL uses Schema's DATETIME field to save all time related > data. However, BeamSQL doesn't implement correctly how TIME and DATE should > be converted to Joda's datetime. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (BEAM-5696) add table name to row type when creating BeamCalciteTable
[ https://issues.apache.org/jira/browse/BEAM-5696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rui Wang closed BEAM-5696. -- Resolution: Not A Problem Fix Version/s: Not applicable > add table name to row type when creating BeamCalciteTable > - > > Key: BEAM-5696 > URL: https://issues.apache.org/jira/browse/BEAM-5696 > Project: Beam > Issue Type: Improvement > Components: dsl-sql >Reporter: Rui Wang >Assignee: Rui Wang >Priority: Major > Fix For: Not applicable > > > see: > [BeamCalciteTable.java#L56|https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java#L56] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (BEAM-5883) Array and Map value cannot be null in Row
[ https://issues.apache.org/jira/browse/BEAM-5883?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rui Wang closed BEAM-5883. -- Resolution: Fixed Fix Version/s: Not applicable > Array and Map value cannot be null in Row > - > > Key: BEAM-5883 > URL: https://issues.apache.org/jira/browse/BEAM-5883 > Project: Beam > Issue Type: Bug > Components: dsl-sql >Reporter: Rui Wang >Assignee: Rui Wang >Priority: Major > Fix For: Not applicable > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (BEAM-5884) Allow nested types have null value.
[ https://issues.apache.org/jira/browse/BEAM-5884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rui Wang closed BEAM-5884. -- Resolution: Fixed Fix Version/s: Not applicable > Allow nested types have null value. > --- > > Key: BEAM-5884 > URL: https://issues.apache.org/jira/browse/BEAM-5884 > Project: Beam > Issue Type: Bug > Components: dsl-sql >Reporter: Rui Wang >Assignee: Rui Wang >Priority: Major > Fix For: Not applicable > > Time Spent: 4h > Remaining Estimate: 0h > > We could allow arbitrary combination of nested types have null value. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (BEAM-5892) Allow registering UDF with the same method name but different argument list
[ https://issues.apache.org/jira/browse/BEAM-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rui Wang closed BEAM-5892. -- Resolution: Fixed Fix Version/s: Not applicable > Allow registering UDF with the same method name but different argument list > --- > > Key: BEAM-5892 > URL: https://issues.apache.org/jira/browse/BEAM-5892 > Project: Beam > Issue Type: Improvement > Components: dsl-sql >Reporter: Rui Wang >Assignee: Rui Wang >Priority: Major > Fix For: Not applicable > > Time Spent: 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas
[ https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=162904=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162904 ] ASF GitHub Bot logged work on BEAM-4461: Author: ASF GitHub Bot Created on: 06/Nov/18 06:10 Start Date: 06/Nov/18 06:10 Worklog Time Spent: 10m Work Description: reuvenlax commented on issue #6832: [BEAM-4461] CoGroup transforms for schemas. URL: https://github.com/apache/beam/pull/6832#issuecomment-436142009 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 162904) Time Spent: 17h 20m (was: 17h 10m) > Create a library of useful transforms that use schemas > -- > > Key: BEAM-4461 > URL: https://issues.apache.org/jira/browse/BEAM-4461 > Project: Beam > Issue Type: Sub-task > Components: sdk-java-core >Reporter: Reuven Lax >Assignee: Reuven Lax >Priority: Major > Time Spent: 17h 20m > Remaining Estimate: 0h > > e.g. JoinBy(fields). Project, Filter, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5976) use AbstractInstant as DATEITME type in functions
[ https://issues.apache.org/jira/browse/BEAM-5976?focusedWorklogId=162897=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162897 ] ASF GitHub Bot logged work on BEAM-5976: Author: ASF GitHub Bot Created on: 06/Nov/18 05:07 Start Date: 06/Nov/18 05:07 Worklog Time Spent: 10m Work Description: amaliujia edited a comment on issue #6950: [BEAM-5976] use AbstractInstant as DATEITME type in functions URL: https://github.com/apache/beam/pull/6950#issuecomment-436041510 Actually I think Row always returns ` ReadableDateTime` ([Row.java#L169)](https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java#L169)). Row does accepts `AbstractInstant` as input: https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java#L611 So will the `ReadableDateTime` be most precise for SQL function? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 162897) Time Spent: 1h 10m (was: 1h) > use AbstractInstant as DATEITME type in functions > - > > Key: BEAM-5976 > URL: https://issues.apache.org/jira/browse/BEAM-5976 > Project: Beam > Issue Type: Improvement > Components: dsl-sql >Reporter: Xu Mingmin >Assignee: Xu Mingmin >Priority: Minor > Time Spent: 1h 10m > Remaining Estimate: 0h > > refer to discussion in > [https://github.com/apache/beam/pull/6913#discussion_r230148526] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5978) portableWordCount gradle task not working
[ https://issues.apache.org/jira/browse/BEAM-5978?focusedWorklogId=162864=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162864 ] ASF GitHub Bot logged work on BEAM-5978: Author: ASF GitHub Bot Created on: 06/Nov/18 02:58 Start Date: 06/Nov/18 02:58 Worklog Time Spent: 10m Work Description: angoenka opened a new pull request #6954: [BEAM-5978] Adding libltdl7 to flink job server docker and correctly picking the docker executable. URL: https://github.com/apache/beam/pull/6954 **Please** add a meaningful description for your change here Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). It will help us expedite review of your Pull Request if you tag someone (e.g. `@username`) to look at it. Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/) | --- | --- | --- | --- | --- | --- Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/) [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/) [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/) | --- | --- | --- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 162864) Time Spent: 10m Remaining Estimate: 0h > portableWordCount gradle task not working >
[jira] [Work logged] (BEAM-4444) Parquet IO for Python SDK
[ https://issues.apache.org/jira/browse/BEAM-?focusedWorklogId=162863=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162863 ] ASF GitHub Bot logged work on BEAM-: Author: ASF GitHub Bot Created on: 06/Nov/18 02:54 Start Date: 06/Nov/18 02:54 Worklog Time Spent: 10m Work Description: ihji commented on issue #6763: [BEAM-] Parquet IO for Python SDK URL: https://github.com/apache/beam/pull/6763#issuecomment-436114110 run python postcommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 162863) Time Spent: 5.5h (was: 5h 20m) > Parquet IO for Python SDK > - > > Key: BEAM- > URL: https://issues.apache.org/jira/browse/BEAM- > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: Bruce Arctor >Assignee: Heejong Lee >Priority: Major > Time Spent: 5.5h > Remaining Estimate: 0h > > Add Parquet Support for the Python SDK. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4444) Parquet IO for Python SDK
[ https://issues.apache.org/jira/browse/BEAM-?focusedWorklogId=162862=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162862 ] ASF GitHub Bot logged work on BEAM-: Author: ASF GitHub Bot Created on: 06/Nov/18 02:52 Start Date: 06/Nov/18 02:52 Worklog Time Spent: 10m Work Description: ihji commented on a change in pull request #6763: [BEAM-] Parquet IO for Python SDK URL: https://github.com/apache/beam/pull/6763#discussion_r230985917 ## File path: sdks/python/apache_beam/io/parquetio_test.py ## @@ -0,0 +1,376 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from __future__ import absolute_import + +import json +import logging +import os +import sys +import tempfile +import unittest + +import hamcrest as hc +import pyarrow as pa +import pyarrow.parquet as pq + +from apache_beam import Create +from apache_beam import Map +from apache_beam.io import filebasedsource +from apache_beam.io import source_test_utils +from apache_beam.io.iobase import RangeTracker +from apache_beam.io.parquetio import ReadAllFromParquet +from apache_beam.io.parquetio import ReadFromParquet +from apache_beam.io.parquetio import WriteToParquet +from apache_beam.io.parquetio import _create_parquet_sink +from apache_beam.io.parquetio import _create_parquet_source +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to +from apache_beam.transforms.display import DisplayData +from apache_beam.transforms.display_test import DisplayDataItemMatcher + + +class TestParquet(unittest.TestCase): + _temp_files = [] + + @classmethod + def setUpClass(cls): +# Method has been renamed in Python 3 +if sys.version_info[0] < 3: + cls.assertCountEqual = cls.assertItemsEqual + + def setUp(self): +# Reducing the size of thread pools. Without this test execution may fail in +# environments with limited amount of resources. +filebasedsource.MAX_NUM_THREADS_FOR_SIZE_ESTIMATION = 2 + + def tearDown(self): +for path in self._temp_files: + if os.path.exists(path): +os.remove(path) +parent = os.path.dirname(path) +if not os.listdir(parent): + os.rmdir(parent) +self._temp_files = [] + + RECORDS = [{'name': 'Thomas', + 'favorite_number': 1, + 'favorite_color': 'blue'}, {'name': 'Henry', Review comment: Just copied from avroio_test sample data. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 162862) Time Spent: 5h 20m (was: 5h 10m) > Parquet IO for Python SDK > - > > Key: BEAM- > URL: https://issues.apache.org/jira/browse/BEAM- > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: Bruce Arctor >Assignee: Heejong Lee >Priority: Major > Time Spent: 5h 20m > Remaining Estimate: 0h > > Add Parquet Support for the Python SDK. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-5978) portableWordCount gradle task not working
Ankur Goenka created BEAM-5978: -- Summary: portableWordCount gradle task not working Key: BEAM-5978 URL: https://issues.apache.org/jira/browse/BEAM-5978 Project: Beam Issue Type: Bug Components: examples-python Reporter: Ankur Goenka Assignee: Ankur Goenka A few issues with portableWordCount gradle task when running with jobserver docker image # Jobserver docker image docker mount fails on linux. # Docker can not write read and write to local file system. # Docker from jobserver docker container requires libltdl7 lib on linux. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5879) TFRecordio not Py3 compatible
[ https://issues.apache.org/jira/browse/BEAM-5879?focusedWorklogId=162852=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162852 ] ASF GitHub Bot logged work on BEAM-5879: Author: ASF GitHub Bot Created on: 06/Nov/18 02:07 Start Date: 06/Nov/18 02:07 Worklog Time Spent: 10m Work Description: tvalentyn commented on a change in pull request #6953: [BEAM-5879 ] Make write_record() in tfrecordio.py py3 compatible URL: https://github.com/apache/beam/pull/6953#discussion_r230980281 ## File path: sdks/python/apache_beam/io/tfrecordio.py ## @@ -78,7 +78,10 @@ def _masked_crc32c(cls, value, crc32c_fn=_default_crc32c_fn): Masked crc32c checksum. """ -crc = crc32c_fn(value) +if isinstance(value, bytes): Review comment: Before this change, the default codepath was: `crc = crc32c_fn(value)` I assume this did not work on Python 3, so, I suspected that on Python 3 `value` is a `str`. Are you saying that `value` is sometimes `str` and sometimes `bytes` on Python 3? If yes, why is value not always the same type? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 162852) Time Spent: 40m (was: 0.5h) > TFRecordio not Py3 compatible > - > > Key: BEAM-5879 > URL: https://issues.apache.org/jira/browse/BEAM-5879 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core > Environment: python 3.5 >Reporter: Ruoyu Liu >Priority: Major > Time Spent: 40m > Remaining Estimate: 0h > > Error when trying to write to file. Initial issue in line 103 in > io/tfrecordio.py, after making the content to bytes, there will be > segmentation fault when reading. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas
[ https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=162835=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162835 ] ASF GitHub Bot logged work on BEAM-4461: Author: ASF GitHub Bot Created on: 06/Nov/18 01:26 Start Date: 06/Nov/18 01:26 Worklog Time Spent: 10m Work Description: reuvenlax commented on a change in pull request #6832: [BEAM-4461] CoGroup transforms for schemas. URL: https://github.com/apache/beam/pull/6832#discussion_r230973954 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/CoGroup.java ## @@ -0,0 +1,326 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.schemas.transforms; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.schemas.FieldAccessDescriptor; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.schemas.SchemaCoder; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.join.CoGbkResult; +import org.apache.beam.sdk.transforms.join.CoGroupByKey; +import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TupleTag; + +/** + * A transform that performs equijoins across multiple schema {@link PCollection}s. + * + * This transform has similarites to {@link CoGroupByKey}, however works on PCollections that + * have schemas. This allows users of the transform to simply specify schema fields to join on. The + * output type of the transform is a {@literal KV} where the value contains one field for + * every input PCollection and the key represents the fields that were joined on. By default the + * cross product is not expanded, so all fields in the output row are array fields. + * + * For example, the following demonstrates joining three PCollections on the "user" and "country" + * fields. + * + * {@code + * TupleTag input1Tag = new TupleTag<>("input1"); + * TupleTag input2Tag = new TupleTag<>("input2"); + * TupleTag input3Tag = new TupleTag<>("input3"); + * PCollection> joined = PCollectionTuple + * .of(input1Tag, input1) + * .and(input2Tag, input2) + * .and(input3Tag, input3) + * .apply(CoGroup.byFieldNames("user", "country")); + * } + * + * In the above case, the key schema will contain the two string fields "user" and "country"; in + * this case, the schemas for Input1, Input2, Input3 must all have fields named "user" and + * "country". The value schema will contain three array of Row fields named "input1" "input2" and Review comment: Also part of the advantage of this API is not needing CoGbkResult. CoGbkResult is sort of a poor-persons schema via use of java generics. Since we know the actual types here, we can do better by just returning a row. Also we want to remove the necessity to use TupleTags (allowing the user to just use strings is nicer, and since we know the types of everything via schemas there's no need for typed TupleTags), and without TupleTags CoGbkResult makes even less sense. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking
[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas
[ https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=162833=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162833 ] ASF GitHub Bot logged work on BEAM-4461: Author: ASF GitHub Bot Created on: 06/Nov/18 01:24 Start Date: 06/Nov/18 01:24 Worklog Time Spent: 10m Work Description: reuvenlax commented on a change in pull request #6832: [BEAM-4461] CoGroup transforms for schemas. URL: https://github.com/apache/beam/pull/6832#discussion_r230973653 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/CoGroup.java ## @@ -0,0 +1,326 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.schemas.transforms; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.schemas.FieldAccessDescriptor; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.schemas.SchemaCoder; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.join.CoGbkResult; +import org.apache.beam.sdk.transforms.join.CoGroupByKey; +import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TupleTag; + +/** + * A transform that performs equijoins across multiple schema {@link PCollection}s. + * + * This transform has similarites to {@link CoGroupByKey}, however works on PCollections that + * have schemas. This allows users of the transform to simply specify schema fields to join on. The + * output type of the transform is a {@literal KV} where the value contains one field for + * every input PCollection and the key represents the fields that were joined on. By default the + * cross product is not expanded, so all fields in the output row are array fields. + * + * For example, the following demonstrates joining three PCollections on the "user" and "country" + * fields. + * + * {@code + * TupleTag input1Tag = new TupleTag<>("input1"); + * TupleTag input2Tag = new TupleTag<>("input2"); + * TupleTag input3Tag = new TupleTag<>("input3"); + * PCollection> joined = PCollectionTuple + * .of(input1Tag, input1) + * .and(input2Tag, input2) + * .and(input3Tag, input3) + * .apply(CoGroup.byFieldNames("user", "country")); + * } + * + * In the above case, the key schema will contain the two string fields "user" and "country"; in + * this case, the schemas for Input1, Input2, Input3 must all have fields named "user" and + * "country". The value schema will contain three array of Row fields named "input1" "input2" and Review comment: Added a more detailed comment (though I used Java POJOs to illustrate the schema). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 162833) Time Spent: 17h (was: 16h 50m) > Create a library of useful transforms that use schemas > -- > > Key: BEAM-4461 > URL: https://issues.apache.org/jira/browse/BEAM-4461 > Project: Beam > Issue Type: Sub-task > Components:
[jira] [Work logged] (BEAM-4461) Create a library of useful transforms that use schemas
[ https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=162832=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162832 ] ASF GitHub Bot logged work on BEAM-4461: Author: ASF GitHub Bot Created on: 06/Nov/18 01:23 Start Date: 06/Nov/18 01:23 Worklog Time Spent: 10m Work Description: reuvenlax commented on a change in pull request #6832: [BEAM-4461] CoGroup transforms for schemas. URL: https://github.com/apache/beam/pull/6832#discussion_r230973566 ## File path: sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/CoGroupTest.java ## @@ -0,0 +1,456 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.schemas.transforms; + +import static junit.framework.TestCase.assertEquals; +import static org.hamcrest.CoreMatchers.allOf; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder; +import static org.junit.Assert.assertThat; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import java.util.List; +import org.apache.beam.sdk.TestUtils.KvMatcher; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.schemas.Schema.TypeName; +import org.apache.beam.sdk.testing.NeedsRunner; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TupleTag; +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.ExpectedException; + +/** Tests for {@link CoGroup}. */ +public class CoGroupTest { + @Rule public final transient TestPipeline pipeline = TestPipeline.create(); + @Rule public transient ExpectedException thrown = ExpectedException.none(); + + private static final Schema JOIN_SCHEMA_1 = Review comment: Renamed variables to not reference join. However CoGroup is a join primitive. In fact while a followup PR will add an expandCrossProduct method to CoGroup (which will give you simply a collection of rows), advanced users might still use the non expanded version as it provides opportunities for optimization (e.g. if you know some expected distributions). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 162832) Time Spent: 16h 50m (was: 16h 40m) > Create a library of useful transforms that use schemas > -- > > Key: BEAM-4461 > URL: https://issues.apache.org/jira/browse/BEAM-4461 > Project: Beam > Issue Type: Sub-task > Components: sdk-java-core >Reporter: Reuven Lax >Assignee: Reuven Lax >Priority: Major > Time Spent: 16h 50m > Remaining Estimate: 0h > > e.g. JoinBy(fields). Project, Filter, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5879) TFRecordio not Py3 compatible
[ https://issues.apache.org/jira/browse/BEAM-5879?focusedWorklogId=162827=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162827 ] ASF GitHub Bot logged work on BEAM-5879: Author: ASF GitHub Bot Created on: 06/Nov/18 01:13 Start Date: 06/Nov/18 01:13 Worklog Time Spent: 10m Work Description: ruoyu90 commented on a change in pull request #6953: [BEAM-5879 ] Make write_record() in tfrecordio.py py3 compatible URL: https://github.com/apache/beam/pull/6953#discussion_r230969844 ## File path: sdks/python/apache_beam/io/tfrecordio.py ## @@ -78,7 +78,10 @@ def _masked_crc32c(cls, value, crc32c_fn=_default_crc32c_fn): Masked crc32c checksum. """ -crc = crc32c_fn(value) +if isinstance(value, bytes): Review comment: In python3, we cannot encode bytes so it will break if we go through the else path. Yeah that is a typo, thanks for reminding! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 162827) Time Spent: 0.5h (was: 20m) > TFRecordio not Py3 compatible > - > > Key: BEAM-5879 > URL: https://issues.apache.org/jira/browse/BEAM-5879 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core > Environment: python 3.5 >Reporter: Ruoyu Liu >Priority: Major > Time Spent: 0.5h > Remaining Estimate: 0h > > Error when trying to write to file. Initial issue in line 103 in > io/tfrecordio.py, after making the content to bytes, there will be > segmentation fault when reading. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5915) Python postcommit test_read_metrics times out
[ https://issues.apache.org/jira/browse/BEAM-5915?focusedWorklogId=162821=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162821 ] ASF GitHub Bot logged work on BEAM-5915: Author: ASF GitHub Bot Created on: 06/Nov/18 00:57 Start Date: 06/Nov/18 00:57 Worklog Time Spent: 10m Work Description: aaltay commented on a change in pull request #6922: [BEAM-5915] Timeout http requests, if they got stuck, at dataflow job creation time URL: https://github.com/apache/beam/pull/6922#discussion_r230969268 ## File path: sdks/python/apache_beam/runners/dataflow/internal/apiclient.py ## @@ -428,14 +429,19 @@ def __init__(self, options): credentials = None else: credentials = get_service_credentials() + +# Use 60 second timeout avoid hangs during network flakiness. +http_client = httplib2.Http(timeout=60) Review comment: Done. It is the socket timeout (https://github.com/httplib2/httplib2/blob/8b65b523359f0da7e91e130e4877f941cf4511d5/python2/httplib2/__init__.py#L1588) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 162821) Time Spent: 40m (was: 0.5h) > Python postcommit test_read_metrics times out > - > > Key: BEAM-5915 > URL: https://issues.apache.org/jira/browse/BEAM-5915 > Project: Beam > Issue Type: Bug > Components: test-failures >Reporter: Henning Rohde >Assignee: Ahmet Altay >Priority: Major > Labels: currently-failing > Time Spent: 40m > Remaining Estimate: 0h > > Happened a couple of times: > https://scans.gradle.com/s/u7z2b7mwyghwi/console-log?task=:beam-sdks-python:validatesRunnerBatchTests#L142 > https://scans.gradle.com/s/b3zz6oyq3ueuw/console-log?task=:beam-sdks-python:validatesRunnerBatchTests > == > ERROR: test_read_metrics > (apache_beam.transforms.ptransform_test.PTransformTest) > -- > Traceback (most recent call last): > File > "/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Py_VR_Dataflow/src/sdks/python/build/gradleenv/local/lib/python2.7/site-packages/nose/plugins/multiprocess.py", > line 812, in run > test(orig) > File > "/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Py_VR_Dataflow/src/sdks/python/build/gradleenv/local/lib/python2.7/site-packages/nose/case.py", > line 45, in __call__ > return self.run(*arg, **kwarg) > File > "/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Py_VR_Dataflow/src/sdks/python/build/gradleenv/local/lib/python2.7/site-packages/nose/case.py", > line 133, in run > self.runTest(result) > File > "/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Py_VR_Dataflow/src/sdks/python/build/gradleenv/local/lib/python2.7/site-packages/nose/case.py", > line 151, in runTest > test(result) > File "/usr/lib/python2.7/unittest/case.py", line 393, in __call__ > return self.run(*args, **kwds) > File "/usr/lib/python2.7/unittest/case.py", line 329, in run > testMethod() > File > "/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Py_VR_Dataflow/src/sdks/python/apache_beam/transforms/ptransform_test.py", > line 211, in test_read_metrics > res = pipeline.run() > File > "/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Py_VR_Dataflow/src/sdks/python/apache_beam/testing/test_pipeline.py", > line 107, in run > else test_runner_api)) > File > "/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Py_VR_Dataflow/src/sdks/python/apache_beam/pipeline.py", > line 405, in run > self._options).run(False) > File > "/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Py_VR_Dataflow/src/sdks/python/apache_beam/pipeline.py", > line 418, in run > return self.runner.run_pipeline(self) > File "/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Py_VR_Dataflow/ > src/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py", line > 70, in run_pipeline > self.result.cancel() > File > "/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Py_VR_Dataflow/src/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py", > line 1208, in cancel > return self.state > File > "/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Py_VR_Dataflow/src/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py", > line 1148, in state > self._update_job() > File >
[jira] [Work logged] (BEAM-5778) Add integrations of Metrics API to Big Query for SyntheticSources load tests in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-5778?focusedWorklogId=162815=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162815 ] ASF GitHub Bot logged work on BEAM-5778: Author: ASF GitHub Bot Created on: 06/Nov/18 00:41 Start Date: 06/Nov/18 00:41 Worklog Time Spent: 10m Work Description: aaltay commented on issue #6943: [BEAM-5778] Add integrations of Metrics API to Big Query for SyntheticcSources load tests in Python SDK URL: https://github.com/apache/beam/pull/6943#issuecomment-436089342 @chamikaramj @swegner Could either one of you review this please? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 162815) Time Spent: 50m (was: 40m) > Add integrations of Metrics API to Big Query for SyntheticSources load tests > in Python SDK > -- > > Key: BEAM-5778 > URL: https://issues.apache.org/jira/browse/BEAM-5778 > Project: Beam > Issue Type: Improvement > Components: testing >Reporter: Kasia Kucharczyk >Assignee: Kasia Kucharczyk >Priority: Major > Time Spent: 50m > Remaining Estimate: 0h > > Right now Metrics API collects basic metrics of load tests of > SyntheticSources (Python SDK). It should be collected in BigQuery for > presenting it on performance dashboards. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5957) [beam_PerformanceTests_Python] Consistently failing due to failing integration test.
[ https://issues.apache.org/jira/browse/BEAM-5957?focusedWorklogId=162814=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162814 ] ASF GitHub Bot logged work on BEAM-5957: Author: ASF GitHub Bot Created on: 06/Nov/18 00:40 Start Date: 06/Nov/18 00:40 Worklog Time Spent: 10m Work Description: aaltay closed pull request #6938: [BEAM-5957] Fix beam_PerformanceTests_Python Jenkins and Gradle configs URL: https://github.com/apache/beam/pull/6938 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/.test-infra/jenkins/job_PerformanceTests_Python.groovy b/.test-infra/jenkins/job_PerformanceTests_Python.groovy index 7a856004190..40944dcb378 100644 --- a/.test-infra/jenkins/job_PerformanceTests_Python.groovy +++ b/.test-infra/jenkins/job_PerformanceTests_Python.groovy @@ -53,6 +53,8 @@ job('beam_PerformanceTests_Python'){ beam_it_class: 'apache_beam.examples.wordcount_it_test:WordCountIT.test_wordcount_it', beam_prebuilt: 'true', // skip beam prebuild beam_python_sdk_location : 'build/apache-beam.tar.gz', + beam_runner : 'TestDataflowRunner', + beam_it_timeout : '1200', beam_it_args : pipelineArgsJoined, ] diff --git a/sdks/python/build.gradle b/sdks/python/build.gradle index b43edf66e28..147a3646604 100644 --- a/sdks/python/build.gradle +++ b/sdks/python/build.gradle @@ -217,11 +217,16 @@ def basicTestOpts = [ "--process-timeout=4500", // timeout of whole command execution ] -def mapToArgString(argMap) { +static def mapToArgString(argMap) { def argList = [] argMap.each { k, v -> if (v in List) { v = "\"${v.join(' ')}\"" +} else if (v in String && v.contains(' ')) { + // We should use double quote around the arg value if it contains series + // of flags joined with space. Otherwise, commandline parsing of the + // shell script will be broken. + v = "\"${v.replace('"', '')}\"" } argList.add("--$k $v") } This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 162814) Time Spent: 0.5h (was: 20m) > [beam_PerformanceTests_Python] Consistently failing due to failing > integration test. > > > Key: BEAM-5957 > URL: https://issues.apache.org/jira/browse/BEAM-5957 > Project: Beam > Issue Type: Bug > Components: sdk-py-core, test-failures >Reporter: Daniel Oliveira >Assignee: Mark Liu >Priority: Major > Time Spent: 0.5h > Remaining Estimate: 0h > > Most recent failure: > [https://builds.apache.org/job/beam_PerformanceTests_Python/1634/] > I haven't been able to debug the root cause of the failure with what I know. > From what I can tell the performance test fails because one of the > integration tests it's benchmarking has failed. But I can't tell what > integration test is causing the failure. > Relevant parts of the logs: > {noformat} > 11:19:01 BUILD FAILED in 36s > 11:19:01 > 11:19:01 2018-11-02 18:19:01,413 3c18295b MainThread > beam_integration_benchmark(1/1) ERRORError during benchmark > beam_integration_benchmark > 11:19:01 Traceback (most recent call last): > 11:19:01 File > "/home/jenkins/jenkins-slave/workspace/beam_PerformanceTests_Python/PerfKitBenchmarker/perfkitbenchmarker/pkb.py", > line 719, in RunBenchmark > 11:19:01 DoRunPhase(spec, collector, detailed_timer) > 11:19:01 File > "/home/jenkins/jenkins-slave/workspace/beam_PerformanceTests_Python/PerfKitBenchmarker/perfkitbenchmarker/pkb.py", > line 580, in DoRunPhase > 11:19:01 samples = spec.BenchmarkRun(spec) > 11:19:01 File > "/home/jenkins/jenkins-slave/workspace/beam_PerformanceTests_Python/PerfKitBenchmarker/perfkitbenchmarker/linux_benchmarks/beam_integration_benchmark.py", > line 160, in Run > 11:19:01 job_type=job_type) > 11:19:01 File > "/home/jenkins/jenkins-slave/workspace/beam_PerformanceTests_Python/PerfKitBenchmarker/perfkitbenchmarker/providers/gcp/gcp_dpb_dataflow.py", > line 90, in SubmitJob > 11:19:01 assert retcode == 0, "Integration Test Failed." > 11:19:01 AssertionError: Integration Test Failed. > 11:19:01 2018-11-02 18:19:01,414 3c18295b
[jira] [Created] (BEAM-5977) Benchmark buffer use in the Go SDK Harness
Robert Burke created BEAM-5977: -- Summary: Benchmark buffer use in the Go SDK Harness Key: BEAM-5977 URL: https://issues.apache.org/jira/browse/BEAM-5977 Project: Beam Issue Type: Improvement Components: sdk-go Reporter: Robert Burke There's opportunity to reduce CPU & RAM usage with better buffer re-use in [datamgr.go|https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/harness/datamgr.go] WRT both the small element batch buffer and with large elements. There should be some benchmarking around both large elements, and smaller elements of varying sizes, (and ideally mixing), and using that we can measure subsequent improvements. As a performance improvement, offhand, maintaining a pair of a couple of `chunkSize` buffers could be handy and avoid associated GRPC costs, to handle the smaller elements, as well as flushing large elements immediately and without copying. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-5650) Timeout exceptions while reading a lot of files from a bounded source like S3 when using TextIO
[ https://issues.apache.org/jira/browse/BEAM-5650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16675875#comment-16675875 ] Ankit Jhalaria commented on BEAM-5650: -- PR: https://github.com/apache/beam/pull/6952 > Timeout exceptions while reading a lot of files from a bounded source like S3 > when using TextIO > --- > > Key: BEAM-5650 > URL: https://issues.apache.org/jira/browse/BEAM-5650 > Project: Beam > Issue Type: Bug > Components: io-java-aws >Reporter: Ankit Jhalaria >Assignee: Ankit Jhalaria >Priority: Major > Time Spent: 20m > Remaining Estimate: 0h > > * Using TextIO, I was trying to read around 850 files. > * Getting this exception while using FlinkRunner > > {code:java} > //Caused by: org.apache.flink.runtime.client.JobExecutionException: > java.io.IOException: com.amazonaws.SdkClientException: Unable to execute HTTP > request: Timeout waiting for connection from pool at > org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:625) > at > org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123) > at > org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:175) > at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:115) ... > 28 more Caused by: java.io.IOException: com.amazonaws.SdkClientException: > Unable to execute HTTP request: Timeout waiting for connection from pool at > org.apache.beam.sdk.io.aws.s3.S3ReadableSeekableByteChannel.read(S3ReadableSeekableByteChannel.java:91) > at > org.apache.beam.sdk.io.CompressedSource$CompressedReader$CountingChannel.read(CompressedSource.java:382) > at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:65) at > sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:109) at > sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) at > java.io.FilterInputStream.read(FilterInputStream.java:133) at > java.io.PushbackInputStream.read(PushbackInputStream.java:186) at > org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.io.ByteStreams.read(ByteStreams.java:859) > at > org.apache.beam.sdk.io.Compression$3.readDecompressed(Compression.java:81) at > org.apache.beam.sdk.io.CompressedSource$CompressionMode.createDecompressingChannel(CompressedSource.java:110) > at > org.apache.beam.sdk.io.CompressedSource$CompressedReader.startReading(CompressedSource.java:417) > at > org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.startImpl(FileBasedSource.java:476) > at > org.apache.beam.sdk.io.OffsetBasedSource$OffsetBasedReader.start(OffsetBasedSource.java:249) > at > org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter$ResidualSource.advance(UnboundedReadFromBoundedSource.java:456) > at > org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter$ResidualSource.access$300(UnboundedReadFromBoundedSource.java:434) > at > org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter$Reader.advance(UnboundedReadFromBoundedSource.java:286) > at > org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter$Reader.start(UnboundedReadFromBoundedSource.java:279) > at > org.apache.beam.runners.flink.metrics.ReaderInvocationUtil.invokeStart(ReaderInvocationUtil.java:51) > at > org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.run(UnboundedSourceWrapper.java:250) > at > org.apache.beam.runners.flink.FlinkStreamingTransformTranslators$UnboundedSourceWrapperNoValueWithRecordId.run(FlinkStreamingTransformTranslators.java:1299) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99) > at > org.apache.flink.streaming.runtime.tasks.StoppableSourceStreamTask.run(StoppableSourceStreamTask.java:45) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) at > java.lang.Thread.run(Thread.java:748) Caused by: > com.amazonaws.SdkClientException: Unable to execute HTTP request: Timeout > waiting for connection from pool at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1116) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1066) > at >
[jira] [Work logged] (BEAM-5650) Timeout exceptions while reading a lot of files from a bounded source like S3 when using TextIO
[ https://issues.apache.org/jira/browse/BEAM-5650?focusedWorklogId=162779=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162779 ] ASF GitHub Bot logged work on BEAM-5650: Author: ASF GitHub Bot Created on: 05/Nov/18 23:18 Start Date: 05/Nov/18 23:18 Worklog Time Spent: 10m Work Description: jhalaria commented on a change in pull request #6952: [BEAM-5650]: Modify BoundedToUnboundedSourceAdapter to configure its reader to read more than 1 bounded source URL: https://github.com/apache/beam/pull/6952#discussion_r230950472 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java ## @@ -104,122 +96,102 @@ public void populateDisplayData(DisplayData.Builder builder) { builder.add(DisplayData.item("source", source.getClass())).include("source", source); } - /** A {@code BoundedSource} to {@code UnboundedSource} adapter. */ @VisibleForTesting public static class BoundedToUnboundedSourceAdapter extends UnboundedSource> { -private BoundedSource boundedSource; +//TODO: There must be a better way to figure out the maxOpenConnections available. Use that number here +//Mention this as part of PR to get feedback. +private static final int READER_QUEUE_SIZE = 10; Review comment: @iemejia - Whats a good way to set the value here? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 162779) Time Spent: 20m (was: 10m) > Timeout exceptions while reading a lot of files from a bounded source like S3 > when using TextIO > --- > > Key: BEAM-5650 > URL: https://issues.apache.org/jira/browse/BEAM-5650 > Project: Beam > Issue Type: Bug > Components: io-java-aws >Reporter: Ankit Jhalaria >Assignee: Ankit Jhalaria >Priority: Major > Time Spent: 20m > Remaining Estimate: 0h > > * Using TextIO, I was trying to read around 850 files. > * Getting this exception while using FlinkRunner > > {code:java} > //Caused by: org.apache.flink.runtime.client.JobExecutionException: > java.io.IOException: com.amazonaws.SdkClientException: Unable to execute HTTP > request: Timeout waiting for connection from pool at > org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:625) > at > org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123) > at > org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:175) > at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:115) ... > 28 more Caused by: java.io.IOException: com.amazonaws.SdkClientException: > Unable to execute HTTP request: Timeout waiting for connection from pool at > org.apache.beam.sdk.io.aws.s3.S3ReadableSeekableByteChannel.read(S3ReadableSeekableByteChannel.java:91) > at > org.apache.beam.sdk.io.CompressedSource$CompressedReader$CountingChannel.read(CompressedSource.java:382) > at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:65) at > sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:109) at > sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) at > java.io.FilterInputStream.read(FilterInputStream.java:133) at > java.io.PushbackInputStream.read(PushbackInputStream.java:186) at > org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.io.ByteStreams.read(ByteStreams.java:859) > at > org.apache.beam.sdk.io.Compression$3.readDecompressed(Compression.java:81) at > org.apache.beam.sdk.io.CompressedSource$CompressionMode.createDecompressingChannel(CompressedSource.java:110) > at > org.apache.beam.sdk.io.CompressedSource$CompressedReader.startReading(CompressedSource.java:417) > at > org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.startImpl(FileBasedSource.java:476) > at > org.apache.beam.sdk.io.OffsetBasedSource$OffsetBasedReader.start(OffsetBasedSource.java:249) > at > org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter$ResidualSource.advance(UnboundedReadFromBoundedSource.java:456) > at > org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter$ResidualSource.access$300(UnboundedReadFromBoundedSource.java:434) > at >
[jira] [Work logged] (BEAM-5650) Timeout exceptions while reading a lot of files from a bounded source like S3 when using TextIO
[ https://issues.apache.org/jira/browse/BEAM-5650?focusedWorklogId=162778=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162778 ] ASF GitHub Bot logged work on BEAM-5650: Author: ASF GitHub Bot Created on: 05/Nov/18 23:16 Start Date: 05/Nov/18 23:16 Worklog Time Spent: 10m Work Description: jhalaria opened a new pull request #6952: [BEAM-5650]: Modify BoundedToUnboundedSourceAdapter to configure its reader to read more than 1 bounded source URL: https://github.com/apache/beam/pull/6952 @iemejia - Please review. Thank-you. - Noticed an issue while reading around 1000 files from S3. - Starting getting connection timeouts as the max number of open connections is set to 50. - Inside https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L250 , we open all the readers simultaneously. Thats the intended behavior for an unbounded source. But when it comes to reading a bounded source (eg. Doing reads from S3), opening all connections at the same time leads to connections not available when the number of files we are trying to open is greater than the maxHttpConnections possible which by default is set to 50 (ClientConfiguration.`DEFAULT_MAX_CONNECTIONS`) and there isn't a way to override it [We should have the ability to override this anyways. Will create a separate PR for that.]. - The Change essentially gives the `BoundedToUnboundedSourceAdapter` an `ArrayDeque` so that one reader can read more than 1 `BoundedSource`. Each Reader finishes reading from a `BoundedSource` and then goes to the next one. - In case of check-pointing, if a reader has elements that aren't read yet from a `BoundedSource`, we create a checkpoint with the remaining elements of the `BoundedSource` and the elements remaining in the `ArrayDequeue`. Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/) | --- | --- | --- | --- | --- | --- Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/) [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/) [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build
[jira] [Work logged] (BEAM-5957) [beam_PerformanceTests_Python] Consistently failing due to failing integration test.
[ https://issues.apache.org/jira/browse/BEAM-5957?focusedWorklogId=162771=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162771 ] ASF GitHub Bot logged work on BEAM-5957: Author: ASF GitHub Bot Created on: 05/Nov/18 22:48 Start Date: 05/Nov/18 22:48 Worklog Time Spent: 10m Work Description: markflyhigh commented on issue #6938: [BEAM-5957] Fix beam_PerformanceTests_Python Jenkins and Gradle configs URL: https://github.com/apache/beam/pull/6938#issuecomment-436064925 beam_PerformanceTests_Python passed in [this build](https://builds.apache.org/view/A-D/view/Beam/job/beam_PerformanceTests_Python/1650/). Squash commits to one and wait for merging since LGTM is received. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 162771) Time Spent: 10m Remaining Estimate: 0h > [beam_PerformanceTests_Python] Consistently failing due to failing > integration test. > > > Key: BEAM-5957 > URL: https://issues.apache.org/jira/browse/BEAM-5957 > Project: Beam > Issue Type: Bug > Components: sdk-py-core, test-failures >Reporter: Daniel Oliveira >Assignee: Mark Liu >Priority: Major > Time Spent: 10m > Remaining Estimate: 0h > > Most recent failure: > [https://builds.apache.org/job/beam_PerformanceTests_Python/1634/] > I haven't been able to debug the root cause of the failure with what I know. > From what I can tell the performance test fails because one of the > integration tests it's benchmarking has failed. But I can't tell what > integration test is causing the failure. > Relevant parts of the logs: > {noformat} > 11:19:01 BUILD FAILED in 36s > 11:19:01 > 11:19:01 2018-11-02 18:19:01,413 3c18295b MainThread > beam_integration_benchmark(1/1) ERRORError during benchmark > beam_integration_benchmark > 11:19:01 Traceback (most recent call last): > 11:19:01 File > "/home/jenkins/jenkins-slave/workspace/beam_PerformanceTests_Python/PerfKitBenchmarker/perfkitbenchmarker/pkb.py", > line 719, in RunBenchmark > 11:19:01 DoRunPhase(spec, collector, detailed_timer) > 11:19:01 File > "/home/jenkins/jenkins-slave/workspace/beam_PerformanceTests_Python/PerfKitBenchmarker/perfkitbenchmarker/pkb.py", > line 580, in DoRunPhase > 11:19:01 samples = spec.BenchmarkRun(spec) > 11:19:01 File > "/home/jenkins/jenkins-slave/workspace/beam_PerformanceTests_Python/PerfKitBenchmarker/perfkitbenchmarker/linux_benchmarks/beam_integration_benchmark.py", > line 160, in Run > 11:19:01 job_type=job_type) > 11:19:01 File > "/home/jenkins/jenkins-slave/workspace/beam_PerformanceTests_Python/PerfKitBenchmarker/perfkitbenchmarker/providers/gcp/gcp_dpb_dataflow.py", > line 90, in SubmitJob > 11:19:01 assert retcode == 0, "Integration Test Failed." > 11:19:01 AssertionError: Integration Test Failed. > 11:19:01 2018-11-02 18:19:01,414 3c18295b MainThread > beam_integration_benchmark(1/1) INFO Cleaning up benchmark > beam_integration_benchmark > 11:19:01 2018-11-02 18:19:01,416 3c18295b MainThread > beam_integration_benchmark(1/1) ERRORException running benchmark > 11:19:01 Traceback (most recent call last): > 11:19:01 File > "/home/jenkins/jenkins-slave/workspace/beam_PerformanceTests_Python/PerfKitBenchmarker/perfkitbenchmarker/pkb.py", > line 860, in RunBenchmarkTask > 11:19:01 RunBenchmark(spec, collector) > 11:19:01 File > "/home/jenkins/jenkins-slave/workspace/beam_PerformanceTests_Python/PerfKitBenchmarker/perfkitbenchmarker/pkb.py", > line 719, in RunBenchmark > 11:19:01 DoRunPhase(spec, collector, detailed_timer) > 11:19:01 File > "/home/jenkins/jenkins-slave/workspace/beam_PerformanceTests_Python/PerfKitBenchmarker/perfkitbenchmarker/pkb.py", > line 580, in DoRunPhase > 11:19:01 samples = spec.BenchmarkRun(spec) > 11:19:01 File > "/home/jenkins/jenkins-slave/workspace/beam_PerformanceTests_Python/PerfKitBenchmarker/perfkitbenchmarker/linux_benchmarks/beam_integration_benchmark.py", > line 160, in Run > 11:19:01 job_type=job_type) > 11:19:01 File > "/home/jenkins/jenkins-slave/workspace/beam_PerformanceTests_Python/PerfKitBenchmarker/perfkitbenchmarker/providers/gcp/gcp_dpb_dataflow.py", > line 90, in SubmitJob > 11:19:01 assert retcode == 0, "Integration Test Failed." > 11:19:01 AssertionError: Integration Test Failed. > 11:19:01 2018-11-02 18:19:01,416 3c18295b MainThread >
[jira] [Work logged] (BEAM-5957) [beam_PerformanceTests_Python] Consistently failing due to failing integration test.
[ https://issues.apache.org/jira/browse/BEAM-5957?focusedWorklogId=162772=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162772 ] ASF GitHub Bot logged work on BEAM-5957: Author: ASF GitHub Bot Created on: 05/Nov/18 22:48 Start Date: 05/Nov/18 22:48 Worklog Time Spent: 10m Work Description: markflyhigh edited a comment on issue #6938: [BEAM-5957] Fix beam_PerformanceTests_Python Jenkins and Gradle configs URL: https://github.com/apache/beam/pull/6938#issuecomment-436064925 beam_PerformanceTests_Python passed in [this build](https://builds.apache.org/view/A-D/view/Beam/job/beam_PerformanceTests_Python/1650/). Squash commits to one and wait for merging since LGTM is received. @aaltay This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 162772) Time Spent: 20m (was: 10m) > [beam_PerformanceTests_Python] Consistently failing due to failing > integration test. > > > Key: BEAM-5957 > URL: https://issues.apache.org/jira/browse/BEAM-5957 > Project: Beam > Issue Type: Bug > Components: sdk-py-core, test-failures >Reporter: Daniel Oliveira >Assignee: Mark Liu >Priority: Major > Time Spent: 20m > Remaining Estimate: 0h > > Most recent failure: > [https://builds.apache.org/job/beam_PerformanceTests_Python/1634/] > I haven't been able to debug the root cause of the failure with what I know. > From what I can tell the performance test fails because one of the > integration tests it's benchmarking has failed. But I can't tell what > integration test is causing the failure. > Relevant parts of the logs: > {noformat} > 11:19:01 BUILD FAILED in 36s > 11:19:01 > 11:19:01 2018-11-02 18:19:01,413 3c18295b MainThread > beam_integration_benchmark(1/1) ERRORError during benchmark > beam_integration_benchmark > 11:19:01 Traceback (most recent call last): > 11:19:01 File > "/home/jenkins/jenkins-slave/workspace/beam_PerformanceTests_Python/PerfKitBenchmarker/perfkitbenchmarker/pkb.py", > line 719, in RunBenchmark > 11:19:01 DoRunPhase(spec, collector, detailed_timer) > 11:19:01 File > "/home/jenkins/jenkins-slave/workspace/beam_PerformanceTests_Python/PerfKitBenchmarker/perfkitbenchmarker/pkb.py", > line 580, in DoRunPhase > 11:19:01 samples = spec.BenchmarkRun(spec) > 11:19:01 File > "/home/jenkins/jenkins-slave/workspace/beam_PerformanceTests_Python/PerfKitBenchmarker/perfkitbenchmarker/linux_benchmarks/beam_integration_benchmark.py", > line 160, in Run > 11:19:01 job_type=job_type) > 11:19:01 File > "/home/jenkins/jenkins-slave/workspace/beam_PerformanceTests_Python/PerfKitBenchmarker/perfkitbenchmarker/providers/gcp/gcp_dpb_dataflow.py", > line 90, in SubmitJob > 11:19:01 assert retcode == 0, "Integration Test Failed." > 11:19:01 AssertionError: Integration Test Failed. > 11:19:01 2018-11-02 18:19:01,414 3c18295b MainThread > beam_integration_benchmark(1/1) INFO Cleaning up benchmark > beam_integration_benchmark > 11:19:01 2018-11-02 18:19:01,416 3c18295b MainThread > beam_integration_benchmark(1/1) ERRORException running benchmark > 11:19:01 Traceback (most recent call last): > 11:19:01 File > "/home/jenkins/jenkins-slave/workspace/beam_PerformanceTests_Python/PerfKitBenchmarker/perfkitbenchmarker/pkb.py", > line 860, in RunBenchmarkTask > 11:19:01 RunBenchmark(spec, collector) > 11:19:01 File > "/home/jenkins/jenkins-slave/workspace/beam_PerformanceTests_Python/PerfKitBenchmarker/perfkitbenchmarker/pkb.py", > line 719, in RunBenchmark > 11:19:01 DoRunPhase(spec, collector, detailed_timer) > 11:19:01 File > "/home/jenkins/jenkins-slave/workspace/beam_PerformanceTests_Python/PerfKitBenchmarker/perfkitbenchmarker/pkb.py", > line 580, in DoRunPhase > 11:19:01 samples = spec.BenchmarkRun(spec) > 11:19:01 File > "/home/jenkins/jenkins-slave/workspace/beam_PerformanceTests_Python/PerfKitBenchmarker/perfkitbenchmarker/linux_benchmarks/beam_integration_benchmark.py", > line 160, in Run > 11:19:01 job_type=job_type) > 11:19:01 File > "/home/jenkins/jenkins-slave/workspace/beam_PerformanceTests_Python/PerfKitBenchmarker/perfkitbenchmarker/providers/gcp/gcp_dpb_dataflow.py", > line 90, in SubmitJob > 11:19:01 assert retcode == 0, "Integration Test Failed." > 11:19:01 AssertionError: Integration Test Failed. > 11:19:01 2018-11-02 18:19:01,416 3c18295b MainThread >
[jira] [Work logged] (BEAM-4124) Support elements larger than 4 MB
[ https://issues.apache.org/jira/browse/BEAM-4124?focusedWorklogId=162767=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162767 ] ASF GitHub Bot logged work on BEAM-4124: Author: ASF GitHub Bot Created on: 05/Nov/18 22:29 Start Date: 05/Nov/18 22:29 Worklog Time Spent: 10m Work Description: aaltay closed pull request #6948: [BEAM-4124] datamgr.go:Remove chunksize constraint URL: https://github.com/apache/beam/pull/6948 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/go/pkg/beam/core/runtime/harness/datamgr.go b/sdks/go/pkg/beam/core/runtime/harness/datamgr.go index 856c0defda2..5c25be0f8bc 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/datamgr.go +++ b/sdks/go/pkg/beam/core/runtime/harness/datamgr.go @@ -370,14 +370,11 @@ func (w *dataWriter) Flush() error { } func (w *dataWriter) Write(p []byte) (n int, err error) { - if len(p) > chunkSize { - panic(fmt.Sprintf("Incoming message too big for transport: %d > %d", len(p), chunkSize)) - } - if len(w.buf)+len(p) > chunkSize { + l := len(w.buf) // We can't fit this message into the buffer. We need to flush the buffer if err := w.Flush(); err != nil { - return 0, err + return 0, fmt.Errorf("datamgr.go: error flushing buffer of length %d: %v", l, err) } } This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 162767) Time Spent: 40m (was: 0.5h) > Support elements larger than 4 MB > - > > Key: BEAM-4124 > URL: https://issues.apache.org/jira/browse/BEAM-4124 > Project: Beam > Issue Type: Bug > Components: sdk-go >Reporter: Cody Schroeder >Assignee: Robert Burke >Priority: Major > Time Spent: 40m > Remaining Estimate: 0h > > The Go SDK harness is limited by a gRPC message size limit of 4 MB. > https://github.com/apache/beam/blob/4a32353/sdks/go/pkg/beam/core/runtime/harness/datamgr.go#L31 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4124) Support elements larger than 4 MB
[ https://issues.apache.org/jira/browse/BEAM-4124?focusedWorklogId=162766=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162766 ] ASF GitHub Bot logged work on BEAM-4124: Author: ASF GitHub Bot Created on: 05/Nov/18 22:29 Start Date: 05/Nov/18 22:29 Worklog Time Spent: 10m Work Description: aaltay commented on a change in pull request #6948: [BEAM-4124] datamgr.go:Remove chunksize constraint URL: https://github.com/apache/beam/pull/6948#discussion_r230937835 ## File path: sdks/go/pkg/beam/core/runtime/harness/datamgr.go ## @@ -370,14 +370,11 @@ func (w *dataWriter) Flush() error { } func (w *dataWriter) Write(p []byte) (n int, err error) { - if len(p) > chunkSize { - panic(fmt.Sprintf("Incoming message too big for transport: %d > %d", len(p), chunkSize)) - } - if len(w.buf)+len(p) > chunkSize { + l := len(w.buf) // We can't fit this message into the buffer. We need to flush the buffer if err := w.Flush(); err != nil { - return 0, err + return 0, fmt.Errorf("datamgr.go: error flushing buffer of length %d: %v", l, err) Review comment: Curios, why not totally drop `l` and use `len(w.buf)` directly as an argument to `Errorf`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 162766) Time Spent: 0.5h (was: 20m) > Support elements larger than 4 MB > - > > Key: BEAM-4124 > URL: https://issues.apache.org/jira/browse/BEAM-4124 > Project: Beam > Issue Type: Bug > Components: sdk-go >Reporter: Cody Schroeder >Assignee: Robert Burke >Priority: Major > Time Spent: 0.5h > Remaining Estimate: 0h > > The Go SDK harness is limited by a gRPC message size limit of 4 MB. > https://github.com/apache/beam/blob/4a32353/sdks/go/pkg/beam/core/runtime/harness/datamgr.go#L31 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5884) Allow nested types have null value.
[ https://issues.apache.org/jira/browse/BEAM-5884?focusedWorklogId=162764=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162764 ] ASF GitHub Bot logged work on BEAM-5884: Author: ASF GitHub Bot Created on: 05/Nov/18 22:24 Start Date: 05/Nov/18 22:24 Worklog Time Spent: 10m Work Description: kennknowles closed pull request #6861: [BEAM-5884] support null values in Array and Map. URL: https://github.com/apache/beam/pull/6861 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java index 86a0f4653d5..0ca6053dac4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java @@ -354,6 +354,10 @@ public boolean isCompositeType() { @Nullable public abstract FieldType getCollectionElementType(); +// For container types (e.g. ARRAY), returns nullable of the type of the contained element. +@Nullable +public abstract Boolean getCollectionElementTypeNullable(); + // For MAP type, returns the type of the key element, it must be a primitive type; @Nullable public abstract FieldType getMapKeyType(); @@ -362,6 +366,10 @@ public boolean isCompositeType() { @Nullable public abstract FieldType getMapValueType(); +// For MAP type, returns nullable of the type of the value element, it can be a nested type; +@Nullable +public abstract Boolean getMapValueTypeNullable(); + // For ROW types, returns the schema for the row. @Nullable public abstract Schema getRowSchema(); @@ -383,10 +391,14 @@ public boolean isCompositeType() { abstract Builder setCollectionElementType(@Nullable FieldType collectionElementType); + abstract Builder setCollectionElementTypeNullable(@Nullable Boolean nullable); + abstract Builder setMapKeyType(@Nullable FieldType mapKeyType); abstract Builder setMapValueType(@Nullable FieldType mapValueType); + abstract Builder setMapValueTypeNullable(@Nullable Boolean nullable); + abstract Builder setRowSchema(@Nullable Schema rowSchema); abstract Builder setMetadata(@Nullable byte[] metadata); @@ -434,7 +446,17 @@ public static FieldType of(TypeName typeName) { /** Create an array type for the given field type. */ public static final FieldType array(FieldType elementType) { - return FieldType.forTypeName(TypeName.ARRAY).setCollectionElementType(elementType).build(); + return FieldType.forTypeName(TypeName.ARRAY) + .setCollectionElementType(elementType) + .setCollectionElementTypeNullable(false) + .build(); +} + +public static final FieldType array(FieldType elementType, boolean nullable) { + return FieldType.forTypeName(TypeName.ARRAY) + .setCollectionElementType(elementType) + .setCollectionElementTypeNullable(nullable) + .build(); } /** Create a map type for the given key and value types. */ @@ -442,6 +464,16 @@ public static final FieldType map(FieldType keyType, FieldType valueType) { return FieldType.forTypeName(TypeName.MAP) .setMapKeyType(keyType) .setMapValueType(valueType) + .setMapValueTypeNullable(false) + .build(); +} + +public static final FieldType map( +FieldType keyType, FieldType valueType, boolean valueTypeNullable) { + return FieldType.forTypeName(TypeName.MAP) + .setMapKeyType(keyType) + .setMapValueType(valueType) + .setMapValueTypeNullable(valueTypeNullable) .build(); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java index b078f742fe9..d03f6d14bcb 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java @@ -444,11 +444,21 @@ public Builder withFieldValueGetters( private Object verify(Object value, FieldType type, String fieldName) { if (TypeName.ARRAY.equals(type.getTypeName())) { -List arrayElements = verifyArray(value, type.getCollectionElementType(), fieldName); +List arrayElements = +verifyArray( +value, +type.getCollectionElementType(), +type.getCollectionElementTypeNullable(), +fieldName); return arrayElements; } else if
[jira] [Work logged] (BEAM-5884) Allow nested types have null value.
[ https://issues.apache.org/jira/browse/BEAM-5884?focusedWorklogId=162762=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162762 ] ASF GitHub Bot logged work on BEAM-5884: Author: ASF GitHub Bot Created on: 05/Nov/18 22:24 Start Date: 05/Nov/18 22:24 Worklog Time Spent: 10m Work Description: kennknowles commented on issue #6861: [BEAM-5884] support null values in Array and Map. URL: https://github.com/apache/beam/pull/6861#issuecomment-436058468 LGTM. I think we should move forwards here. This represents progress. The default is an interesting question, considering that we probably want to support arrays of rows in a compatible way to rows in a table. That sounds like a good dev list discussion. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 162762) Time Spent: 3h 50m (was: 3h 40m) > Allow nested types have null value. > --- > > Key: BEAM-5884 > URL: https://issues.apache.org/jira/browse/BEAM-5884 > Project: Beam > Issue Type: Bug > Components: dsl-sql >Reporter: Rui Wang >Assignee: Rui Wang >Priority: Major > Time Spent: 3h 50m > Remaining Estimate: 0h > > We could allow arbitrary combination of nested types have null value. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5304) Support running user pipelines with the Universal Local Runner in Java.
[ https://issues.apache.org/jira/browse/BEAM-5304?focusedWorklogId=162760=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162760 ] ASF GitHub Bot logged work on BEAM-5304: Author: ASF GitHub Bot Created on: 05/Nov/18 22:11 Start Date: 05/Nov/18 22:11 Worklog Time Spent: 10m Work Description: youngoli commented on issue #6904: [BEAM-5304] Adding default and dynamic ports to ULR. URL: https://github.com/apache/beam/pull/6904#issuecomment-436054699 @lukecwik, @swegner, or @kennknowles for merging. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 162760) Time Spent: 3h 10m (was: 3h) > Support running user pipelines with the Universal Local Runner in Java. > --- > > Key: BEAM-5304 > URL: https://issues.apache.org/jira/browse/BEAM-5304 > Project: Beam > Issue Type: Sub-task > Components: runner-direct >Reporter: Daniel Oliveira >Assignee: Daniel Oliveira >Priority: Major > Time Spent: 3h 10m > Remaining Estimate: 0h > > In order to aid testing, devs should be able to write pipelines and then > easily run them with the ULR. The Flink runner seems to have this > functionality, so the implementation could be based on that. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-4544) Add support for intellij to work with vendored artifacts generated as part of the Apache Beam build process
[ https://issues.apache.org/jira/browse/BEAM-4544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16675795#comment-16675795 ] Scott Wegner commented on BEAM-4544: This keeps on coming up on the dev@ list; I'll work on adding some documentation to the wiki FAQ. > Add support for intellij to work with vendored artifacts generated as part of > the Apache Beam build process > --- > > Key: BEAM-4544 > URL: https://issues.apache.org/jira/browse/BEAM-4544 > Project: Beam > Issue Type: Improvement > Components: build-system >Reporter: Luke Cwik >Priority: Major > > Intellij has difficulty resolving vendored artifacts such as > *org.apache.beam.vendor.protobuf.v3.com.google.protobuf.Message*. This > prevents the code editor for making reasonable substitutions, intellisense > from working, and tracing through source code during debugging. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (BEAM-5946) Upgrade google-apitools to 0.5.24
[ https://issues.apache.org/jira/browse/BEAM-5946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mark Liu resolved BEAM-5946. Resolution: Fixed > Upgrade google-apitools to 0.5.24 > - > > Key: BEAM-5946 > URL: https://issues.apache.org/jira/browse/BEAM-5946 > Project: Beam > Issue Type: Task > Components: sdk-py-core >Affects Versions: 2.9.0 >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Fix For: 2.9.0 > > Time Spent: 20m > Remaining Estimate: 0h > > Python google-apitools library contains important support that fix Python 3 > incompatible issue in 0.5.23 and above. In order to make Python SDK > compatible with Python 3, we want to upgrade it to the latest version > (0.5.24). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (BEAM-5770) Document IntelliJ workflow: Recover from other common IDE errors (FAQ)
[ https://issues.apache.org/jira/browse/BEAM-5770?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Scott Wegner resolved BEAM-5770. Resolution: Fixed Assignee: Scott Wegner Fix Version/s: Not applicable Available here: https://cwiki.apache.org/confluence/display/BEAM/%28FAQ%29+Recovering+from+common+IDE+errors > Document IntelliJ workflow: Recover from other common IDE errors (FAQ) > -- > > Key: BEAM-5770 > URL: https://issues.apache.org/jira/browse/BEAM-5770 > Project: Beam > Issue Type: Sub-task > Components: build-system, website >Reporter: Scott Wegner >Assignee: Scott Wegner >Priority: Major > Fix For: Not applicable > > > The current IntelliJ documentation is not well organized. The plan is to > re-organize it into a set of developer workflows, with very prescriptive > steps that are easy to follow and validate that they are still working. > This task tracks writing documentation for the scenario: "Recover from other > common IDE errors (FAQ)" > The proposed set of workflows to document is listed in this notes doc: > https://docs.google.com/document/d/18eXrO9IYll4oOnFb53EBhOtIfx-JLOinTWZSIBFkLk4/edit?usp=sharing > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (BEAM-5767) Document IntelliJ workflow: Run a single unit test
[ https://issues.apache.org/jira/browse/BEAM-5767?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Scott Wegner resolved BEAM-5767. Resolution: Fixed Fix Version/s: Not applicable Available here: https://cwiki.apache.org/confluence/display/BEAM/Using+IntelliJ+IDE > Document IntelliJ workflow: Run a single unit test > -- > > Key: BEAM-5767 > URL: https://issues.apache.org/jira/browse/BEAM-5767 > Project: Beam > Issue Type: Sub-task > Components: build-system, website >Reporter: Scott Wegner >Assignee: Scott Wegner >Priority: Major > Fix For: Not applicable > > > The current IntelliJ documentation is not well organized. The plan is to > re-organize it into a set of developer workflows, with very prescriptive > steps that are easy to follow and validate that they are still working. > This task tracks writing documentation for the scenario: "How-to: Run a > single unit test" > The proposed set of workflows to document is listed in this notes doc: > https://docs.google.com/document/d/18eXrO9IYll4oOnFb53EBhOtIfx-JLOinTWZSIBFkLk4/edit?usp=sharing > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5976) use AbstractInstant as DATEITME type in functions
[ https://issues.apache.org/jira/browse/BEAM-5976?focusedWorklogId=162755=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162755 ] ASF GitHub Bot logged work on BEAM-5976: Author: ASF GitHub Bot Created on: 05/Nov/18 21:28 Start Date: 05/Nov/18 21:28 Worklog Time Spent: 10m Work Description: amaliujia commented on issue #6950: [BEAM-5976] use AbstractInstant as DATEITME type in functions URL: https://github.com/apache/beam/pull/6950#issuecomment-436041510 Actually I think Row always returns ` ReadableDateTime` ([Row.java#L169)](https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java#L169)). Row does accepts `AbstractInstant` as input: https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java#L579 So will the `ReadableDateTime` be most precise for SQL function? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 162755) Time Spent: 1h (was: 50m) > use AbstractInstant as DATEITME type in functions > - > > Key: BEAM-5976 > URL: https://issues.apache.org/jira/browse/BEAM-5976 > Project: Beam > Issue Type: Improvement > Components: dsl-sql >Reporter: Xu Mingmin >Assignee: Xu Mingmin >Priority: Minor > Time Spent: 1h > Remaining Estimate: 0h > > refer to discussion in > [https://github.com/apache/beam/pull/6913#discussion_r230148526] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5928) ConcurrentModificationException from RowCoderGenerator lazy caching
[ https://issues.apache.org/jira/browse/BEAM-5928?focusedWorklogId=162752=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162752 ] ASF GitHub Bot logged work on BEAM-5928: Author: ASF GitHub Bot Created on: 05/Nov/18 21:10 Start Date: 05/Nov/18 21:10 Worklog Time Spent: 10m Work Description: reuvenlax closed pull request #6927: [BEAM-5928] Change hash map to concurrent map. URL: https://github.com/apache/beam/pull/6927 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java index 50be370adb0..b7485821911 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java @@ -101,12 +101,12 @@ private static final Map CODER_MAP; // Cache for Coder class that are already generated. - private static Map> generatedCoders = Maps.newHashMap(); + private static Map> generatedCoders = Maps.newConcurrentMap(); static { // Initialize the CODER_MAP with the StackManipulations to create the primitive coders. // Assumes that each class contains a static of() constructor method. -CODER_MAP = Maps.newHashMap(); +CODER_MAP = Maps.newConcurrentMap(); for (Map.Entry entry : RowCoder.CODER_MAP.entrySet()) { StackManipulation stackManipulation = MethodInvocation.invoke( This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 162752) Time Spent: 50m (was: 40m) > ConcurrentModificationException from RowCoderGenerator lazy caching > --- > > Key: BEAM-5928 > URL: https://issues.apache.org/jira/browse/BEAM-5928 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Reporter: Benson Tucker >Assignee: Reuven Lax >Priority: Major > Time Spent: 50m > Remaining Estimate: 0h > > h3. Summary: > RowCoderGenerator caches a delegate Coder once encode or decode is > exercised, but there's not an API for caching this delegate eagerly. > h3. Use Case: > When creating several PCollections to perform distinct reads with the same > schema, you might create one RowCoder.of(schema) before creating the list of > PCollections / PCollectionsList. However, once the pipeline begins and rows > arrive for encoding, these pipelines will simultaneously try to cache a > delegate coder for the row's schema. > h3. Workaround: > You can force the eager caching of the code by exercising encode in the main > application before creating PCollections using the RowCoder: > {code:java} > try { > myRowCoder.encode(null, null); > } catch (IOException | NullPointerException e) { > // do nothing > } > {code} > h3. Context: > I've only encountered this during development with the direct runner. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5976) use AbstractInstant as DATEITME type in functions
[ https://issues.apache.org/jira/browse/BEAM-5976?focusedWorklogId=162753=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162753 ] ASF GitHub Bot logged work on BEAM-5976: Author: ASF GitHub Bot Created on: 05/Nov/18 21:10 Start Date: 05/Nov/18 21:10 Worklog Time Spent: 10m Work Description: XuMingmin closed pull request #6950: [BEAM-5976] use AbstractInstant as DATEITME type in functions URL: https://github.com/apache/beam/pull/6950 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java index 3e442fe7c11..5ead358925d 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java @@ -28,6 +28,7 @@ import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.util.NlsString; import org.joda.time.ReadableInstant; +import org.joda.time.base.AbstractInstant; /** * {@link BeamSqlPrimitive} is a special, self-reference {@link BeamSqlExpression}. It holds the @@ -137,7 +138,7 @@ public boolean accept() { case TIME: case TIMESTAMP: case DATE: -return value instanceof ReadableInstant; +return value instanceof AbstractInstant; case INTERVAL_SECOND: case INTERVAL_MINUTE: case INTERVAL_HOUR: diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java index 96ad251b36b..cd93166d2c7 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java @@ -30,7 +30,7 @@ import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.sql.type.SqlTypeName; -import org.joda.time.ReadableInstant; +import org.joda.time.base.AbstractInstant; /** Utility methods for Calcite related operations. */ public class CalciteUtils { @@ -195,14 +195,14 @@ private static RelDataType toRelDataType( /** * SQL-Java type mapping, with specified Beam rules: - * 1. redirect {@link ReadableInstant} to {@link Date} so Calcite can recognize it. + * 1. redirect {@link AbstractInstant} to {@link Date} so Calcite can recognize it. * * @param rawType * @return */ public static RelDataType sqlTypeWithAutoCast(RelDataTypeFactory typeFactory, Type rawType) { //For Joda time types, return SQL type for java.util.Date. -if (rawType instanceof Class && ReadableInstant.class.isAssignableFrom((Class) rawType)) { +if (rawType instanceof Class && AbstractInstant.class.isAssignableFrom((Class) rawType)) { return typeFactory.createJavaType(Date.class); } return typeFactory.createJavaType((Class) rawType); This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 162753) Time Spent: 50m (was: 40m) > use AbstractInstant as DATEITME type in functions > - > > Key: BEAM-5976 > URL: https://issues.apache.org/jira/browse/BEAM-5976 > Project: Beam > Issue Type: Improvement > Components: dsl-sql >Reporter: Xu Mingmin >Assignee: Xu Mingmin >Priority: Minor > Time Spent: 50m > Remaining Estimate: 0h > > refer to discussion in > [https://github.com/apache/beam/pull/6913#discussion_r230148526] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-5967) ProtoCoder doesn't support DynamicMessage
[ https://issues.apache.org/jira/browse/BEAM-5967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16675771#comment-16675771 ] Alex Van Boxel commented on BEAM-5967: -- I indeed want to avoid conditionals in the codec, that why I was also thinking of going for a separate class. I don't know if it can be avoided not coding the schema with the data (but it's the nature of the dynamic beast I guess (I'll see if I can find a solution for this). I will prototype it with a separate class. > ProtoCoder doesn't support DynamicMessage > - > > Key: BEAM-5967 > URL: https://issues.apache.org/jira/browse/BEAM-5967 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core >Affects Versions: 2.8.0 >Reporter: Alex Van Boxel >Priority: Major > > The ProtoCoder does make some assumptions about static messages being > available. The DynamicMessage doesn't have some of them, mainly because the > proto schema is defined at runtime and not at compile time. > Does it make sense to make a special coder for DynamicMessage or build it > into the normal ProtoCoder. > Here is an example of the assumtion being made in the current Codec: > {code:java} > try { > @SuppressWarnings("unchecked") > T protoMessageInstance = (T) > protoMessageClass.getMethod("getDefaultInstance").invoke(null); > @SuppressWarnings("unchecked") > Parser tParser = (Parser) protoMessageInstance.getParserForType(); > memoizedParser = tParser; > } catch (IllegalAccessException | InvocationTargetException | > NoSuchMethodException e) { > throw new IllegalArgumentException(e); > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5976) use AbstractInstant as DATEITME type in functions
[ https://issues.apache.org/jira/browse/BEAM-5976?focusedWorklogId=162751=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162751 ] ASF GitHub Bot logged work on BEAM-5976: Author: ASF GitHub Bot Created on: 05/Nov/18 21:10 Start Date: 05/Nov/18 21:10 Worklog Time Spent: 10m Work Description: XuMingmin commented on issue #6950: [BEAM-5976] use AbstractInstant as DATEITME type in functions URL: https://github.com/apache/beam/pull/6950#issuecomment-436036035 merging This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 162751) Time Spent: 40m (was: 0.5h) > use AbstractInstant as DATEITME type in functions > - > > Key: BEAM-5976 > URL: https://issues.apache.org/jira/browse/BEAM-5976 > Project: Beam > Issue Type: Improvement > Components: dsl-sql >Reporter: Xu Mingmin >Assignee: Xu Mingmin >Priority: Minor > Time Spent: 40m > Remaining Estimate: 0h > > refer to discussion in > [https://github.com/apache/beam/pull/6913#discussion_r230148526] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-5967) ProtoCoder doesn't support DynamicMessage
[ https://issues.apache.org/jira/browse/BEAM-5967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16675744#comment-16675744 ] Kenneth Knowles commented on BEAM-5967: --- So the hard question is where are you going to get that argument? If you provide it to a manually-instantiated coder then it should be pretty easy. If the code ends up with conditionals in every method, then it would make sense to have a separate class, but that is just an implementation detail. > ProtoCoder doesn't support DynamicMessage > - > > Key: BEAM-5967 > URL: https://issues.apache.org/jira/browse/BEAM-5967 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core >Affects Versions: 2.8.0 >Reporter: Alex Van Boxel >Priority: Major > > The ProtoCoder does make some assumptions about static messages being > available. The DynamicMessage doesn't have some of them, mainly because the > proto schema is defined at runtime and not at compile time. > Does it make sense to make a special coder for DynamicMessage or build it > into the normal ProtoCoder. > Here is an example of the assumtion being made in the current Codec: > {code:java} > try { > @SuppressWarnings("unchecked") > T protoMessageInstance = (T) > protoMessageClass.getMethod("getDefaultInstance").invoke(null); > @SuppressWarnings("unchecked") > Parser tParser = (Parser) protoMessageInstance.getParserForType(); > memoizedParser = tParser; > } catch (IllegalAccessException | InvocationTargetException | > NoSuchMethodException e) { > throw new IllegalArgumentException(e); > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (BEAM-5967) ProtoCoder doesn't support DynamicMessage
[ https://issues.apache.org/jira/browse/BEAM-5967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16675744#comment-16675744 ] Kenneth Knowles edited comment on BEAM-5967 at 11/5/18 8:53 PM: So the question is where are you going to get that argument? If you provide it to a manually-instantiated coder then it should be pretty easy. If the code ends up with conditionals in every method, then it would make sense to have a separate class, but that is just an implementation detail. If you are going to include it in the data then it could be very inefficient (I think we have some Avro cases that do this :-/ ) was (Author: kenn): So the hard question is where are you going to get that argument? If you provide it to a manually-instantiated coder then it should be pretty easy. If the code ends up with conditionals in every method, then it would make sense to have a separate class, but that is just an implementation detail. > ProtoCoder doesn't support DynamicMessage > - > > Key: BEAM-5967 > URL: https://issues.apache.org/jira/browse/BEAM-5967 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core >Affects Versions: 2.8.0 >Reporter: Alex Van Boxel >Priority: Major > > The ProtoCoder does make some assumptions about static messages being > available. The DynamicMessage doesn't have some of them, mainly because the > proto schema is defined at runtime and not at compile time. > Does it make sense to make a special coder for DynamicMessage or build it > into the normal ProtoCoder. > Here is an example of the assumtion being made in the current Codec: > {code:java} > try { > @SuppressWarnings("unchecked") > T protoMessageInstance = (T) > protoMessageClass.getMethod("getDefaultInstance").invoke(null); > @SuppressWarnings("unchecked") > Parser tParser = (Parser) protoMessageInstance.getParserForType(); > memoizedParser = tParser; > } catch (IllegalAccessException | InvocationTargetException | > NoSuchMethodException e) { > throw new IllegalArgumentException(e); > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5928) ConcurrentModificationException from RowCoderGenerator lazy caching
[ https://issues.apache.org/jira/browse/BEAM-5928?focusedWorklogId=162746=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162746 ] ASF GitHub Bot logged work on BEAM-5928: Author: ASF GitHub Bot Created on: 05/Nov/18 20:51 Start Date: 05/Nov/18 20:51 Worklog Time Spent: 10m Work Description: reuvenlax commented on issue #6927: [BEAM-5928] Change hash map to concurrent map. URL: https://github.com/apache/beam/pull/6927#issuecomment-436030164 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 162746) Time Spent: 40m (was: 0.5h) > ConcurrentModificationException from RowCoderGenerator lazy caching > --- > > Key: BEAM-5928 > URL: https://issues.apache.org/jira/browse/BEAM-5928 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Reporter: Benson Tucker >Assignee: Reuven Lax >Priority: Major > Time Spent: 40m > Remaining Estimate: 0h > > h3. Summary: > RowCoderGenerator caches a delegate Coder once encode or decode is > exercised, but there's not an API for caching this delegate eagerly. > h3. Use Case: > When creating several PCollections to perform distinct reads with the same > schema, you might create one RowCoder.of(schema) before creating the list of > PCollections / PCollectionsList. However, once the pipeline begins and rows > arrive for encoding, these pipelines will simultaneously try to cache a > delegate coder for the row's schema. > h3. Workaround: > You can force the eager caching of the code by exercising encode in the main > application before creating PCollections using the RowCoder: > {code:java} > try { > myRowCoder.encode(null, null); > } catch (IOException | NullPointerException e) { > // do nothing > } > {code} > h3. Context: > I've only encountered this during development with the direct runner. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5976) use AbstractInstant as DATEITME type in functions
[ https://issues.apache.org/jira/browse/BEAM-5976?focusedWorklogId=162744=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162744 ] ASF GitHub Bot logged work on BEAM-5976: Author: ASF GitHub Bot Created on: 05/Nov/18 20:49 Start Date: 05/Nov/18 20:49 Worklog Time Spent: 10m Work Description: amaliujia edited a comment on issue #6950: [BEAM-5976] use AbstractInstant as DATEITME type in functions URL: https://github.com/apache/beam/pull/6950#issuecomment-436020159 LGTM. So the inconsistency is between Schema/Row and SQL. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 162744) Time Spent: 0.5h (was: 20m) > use AbstractInstant as DATEITME type in functions > - > > Key: BEAM-5976 > URL: https://issues.apache.org/jira/browse/BEAM-5976 > Project: Beam > Issue Type: Improvement > Components: dsl-sql >Reporter: Xu Mingmin >Assignee: Xu Mingmin >Priority: Minor > Time Spent: 0.5h > Remaining Estimate: 0h > > refer to discussion in > [https://github.com/apache/beam/pull/6913#discussion_r230148526] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5976) use AbstractInstant as DATEITME type in functions
[ https://issues.apache.org/jira/browse/BEAM-5976?focusedWorklogId=162740=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162740 ] ASF GitHub Bot logged work on BEAM-5976: Author: ASF GitHub Bot Created on: 05/Nov/18 20:17 Start Date: 05/Nov/18 20:17 Worklog Time Spent: 10m Work Description: amaliujia commented on issue #6950: [BEAM-5976] use AbstractInstant as DATEITME type in functions URL: https://github.com/apache/beam/pull/6950#issuecomment-436020159 LGTM. So the inconsistency is between Row and SQL. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 162740) Time Spent: 20m (was: 10m) > use AbstractInstant as DATEITME type in functions > - > > Key: BEAM-5976 > URL: https://issues.apache.org/jira/browse/BEAM-5976 > Project: Beam > Issue Type: Improvement > Components: dsl-sql >Reporter: Xu Mingmin >Assignee: Xu Mingmin >Priority: Minor > Time Spent: 20m > Remaining Estimate: 0h > > refer to discussion in > [https://github.com/apache/beam/pull/6913#discussion_r230148526] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5918) Add Cast transform for Rows
[ https://issues.apache.org/jira/browse/BEAM-5918?focusedWorklogId=162738=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162738 ] ASF GitHub Bot logged work on BEAM-5918: Author: ASF GitHub Bot Created on: 05/Nov/18 20:12 Start Date: 05/Nov/18 20:12 Worklog Time Spent: 10m Work Description: kennknowles commented on issue #6888: [BEAM-5918] Add Cast transform for Rows URL: https://github.com/apache/beam/pull/6888#issuecomment-436018476 Ah, sorry I neglectected the squashing. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 162738) Time Spent: 5h 20m (was: 5h 10m) > Add Cast transform for Rows > --- > > Key: BEAM-5918 > URL: https://issues.apache.org/jira/browse/BEAM-5918 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core >Reporter: Gleb Kanterov >Assignee: Gleb Kanterov >Priority: Major > Time Spent: 5h 20m > Remaining Estimate: 0h > > There is a need for a generic transform that given two Row schemas will > convert rows between them. There must be a possibility to opt-out from > certain kind of conversions, for instance, converting ints to shorts can > cause overflow. Another example, a schema could have a nullable field, but > never have NULL value in practice, because it was filtered out. > What is needed: > - widening values (e.g., int -> long) > - narrowwing (e.g., int -> short) > - runtime check for overflow while narrowing > - ignoring nullability (nullable=true -> nullable=false) > - weakening nullability (nullable=false -> nullable=true) > - projection (Schema(a: Int32, b: Int32) -> Schema(a: Int32)) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5918) Add Cast transform for Rows
[ https://issues.apache.org/jira/browse/BEAM-5918?focusedWorklogId=162736=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162736 ] ASF GitHub Bot logged work on BEAM-5918: Author: ASF GitHub Bot Created on: 05/Nov/18 20:11 Start Date: 05/Nov/18 20:11 Worklog Time Spent: 10m Work Description: kennknowles closed pull request #6888: [BEAM-5918] Add Cast transform for Rows URL: https://github.com/apache/beam/pull/6888 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java index 86a0f4653d5..1587a6bbee7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java @@ -292,7 +292,7 @@ public int hashCode() { INT16, // two-byte signed integer. INT32, // four-byte signed integer. INT64, // eight-byte signed integer. -DECIMAL, // Decimal integer +DECIMAL, // Arbitrary-precision decimal number FLOAT, DOUBLE, STRING, // String. @@ -338,6 +338,47 @@ public boolean isMapType() { public boolean isCompositeType() { return COMPOSITE_TYPES.contains(this); } + +public boolean isSubtypeOf(TypeName other) { + return other.isSupertypeOf(this); +} + +public boolean isSupertypeOf(TypeName other) { + if (this == other) { +return true; + } + + // defined only for numeric types + if (!isNumericType() || !other.isNumericType()) { +return false; + } + + switch (this) { +case BYTE: + return false; + +case INT16: + return other == BYTE; + +case INT32: + return other == BYTE || other == INT16; + +case INT64: + return other == BYTE || other == INT16 || other == INT32; + +case FLOAT: + return false; + +case DOUBLE: + return other == FLOAT; + +case DECIMAL: + return other == FLOAT || other == DOUBLE; + +default: + throw new AssertionError("Unexpected numeric type: " + this); + } +} } /** diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Cast.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Cast.java new file mode 100644 index 000..3048806edf0 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Cast.java @@ -0,0 +1,440 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.schemas.transforms; + +import com.google.auto.value.AutoValue; +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Maps; +import java.io.Serializable; +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.schemas.FieldAccessDescriptor; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.Field; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.schemas.Schema.TypeName; +import org.apache.beam.sdk.schemas.utils.SchemaZipFold; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; + +/** Set of utilities for casting rows between schemas. */ +@Experimental(Experimental.Kind.SCHEMAS) +@AutoValue +public abstract class Cast extends PTransform, PCollection> { + + public abstract Schema outputSchema(); +
[jira] [Work logged] (BEAM-5918) Add Cast transform for Rows
[ https://issues.apache.org/jira/browse/BEAM-5918?focusedWorklogId=162737=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162737 ] ASF GitHub Bot logged work on BEAM-5918: Author: ASF GitHub Bot Created on: 05/Nov/18 20:11 Start Date: 05/Nov/18 20:11 Worklog Time Spent: 10m Work Description: kennknowles commented on issue #6888: [BEAM-5918] Add Cast transform for Rows URL: https://github.com/apache/beam/pull/6888#issuecomment-436018354 Merged, but if you had other changes in progress, just open another PR from the branch. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 162737) Time Spent: 5h 10m (was: 5h) > Add Cast transform for Rows > --- > > Key: BEAM-5918 > URL: https://issues.apache.org/jira/browse/BEAM-5918 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core >Reporter: Gleb Kanterov >Assignee: Gleb Kanterov >Priority: Major > Time Spent: 5h 10m > Remaining Estimate: 0h > > There is a need for a generic transform that given two Row schemas will > convert rows between them. There must be a possibility to opt-out from > certain kind of conversions, for instance, converting ints to shorts can > cause overflow. Another example, a schema could have a nullable field, but > never have NULL value in practice, because it was filtered out. > What is needed: > - widening values (e.g., int -> long) > - narrowwing (e.g., int -> short) > - runtime check for overflow while narrowing > - ignoring nullability (nullable=true -> nullable=false) > - weakening nullability (nullable=false -> nullable=true) > - projection (Schema(a: Int32, b: Int32) -> Schema(a: Int32)) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5884) Allow nested types have null value.
[ https://issues.apache.org/jira/browse/BEAM-5884?focusedWorklogId=162735=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162735 ] ASF GitHub Bot logged work on BEAM-5884: Author: ASF GitHub Bot Created on: 05/Nov/18 20:07 Start Date: 05/Nov/18 20:07 Worklog Time Spent: 10m Work Description: kennknowles commented on issue #6861: [BEAM-5884] support null values in Array and Map. URL: https://github.com/apache/beam/pull/6861#issuecomment-436016919 Yes, having nullabillity as a boolean flag doesn't really compose. I think SQL will be that way forever because it is too late. But we can do the right thing in Beam schemas and map SQL into them. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 162735) Time Spent: 3h 40m (was: 3.5h) > Allow nested types have null value. > --- > > Key: BEAM-5884 > URL: https://issues.apache.org/jira/browse/BEAM-5884 > Project: Beam > Issue Type: Bug > Components: dsl-sql >Reporter: Rui Wang >Assignee: Rui Wang >Priority: Major > Time Spent: 3h 40m > Remaining Estimate: 0h > > We could allow arbitrary combination of nested types have null value. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5976) use AbstractInstant as DATEITME type in functions
[ https://issues.apache.org/jira/browse/BEAM-5976?focusedWorklogId=162731=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162731 ] ASF GitHub Bot logged work on BEAM-5976: Author: ASF GitHub Bot Created on: 05/Nov/18 20:02 Start Date: 05/Nov/18 20:02 Worklog Time Spent: 10m Work Description: XuMingmin opened a new pull request #6950: [BEAM-5976] use AbstractInstant as DATEITME type in functions URL: https://github.com/apache/beam/pull/6950 Quick fix for discussion thread https://github.com/apache/beam/pull/6913#discussion_r230148526. R: @akedin @amaliujia @apilloud Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). It will help us expedite review of your Pull Request if you tag someone (e.g. `@username`) to look at it. Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/) | --- | --- | --- | --- | --- | --- Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/) [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/) [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/) | --- | --- | --- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 162731) Time Spent: 10m Remaining Estimate: 0h > use AbstractInstant as DATEITME type in
[jira] [Commented] (BEAM-5957) [beam_PerformanceTests_Python] Consistently failing due to failing integration test.
[ https://issues.apache.org/jira/browse/BEAM-5957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16675695#comment-16675695 ] Mark Liu commented on BEAM-5957: Since Perfkit will eventually run Gradle command in a subprocess, Gradle log will be printed in Jenkins console output and always contains more details. Here are some useful log from console output: {code} ... 10:19:01 > Task :beam-sdks-python:integrationTest FAILED 10:19:01 Unknown option: --staging_location=gs://temp-storage-for-end-to-end-tests/staging-it 10:19:01 4 actionable tasks: 4 executed 10:19:01 10:19:01 Publishing build scan... 10:19:01 https://gradle.com/s/2bvsfzm5wk2t6 ... {code} The problem comes from Python Gradle task which invokes postcommit shell script. https://github.com/apache/beam/pull/6938 is out to fix the Gradle script issue. > [beam_PerformanceTests_Python] Consistently failing due to failing > integration test. > > > Key: BEAM-5957 > URL: https://issues.apache.org/jira/browse/BEAM-5957 > Project: Beam > Issue Type: Bug > Components: sdk-py-core, test-failures >Reporter: Daniel Oliveira >Assignee: Mark Liu >Priority: Major > > Most recent failure: > [https://builds.apache.org/job/beam_PerformanceTests_Python/1634/] > I haven't been able to debug the root cause of the failure with what I know. > From what I can tell the performance test fails because one of the > integration tests it's benchmarking has failed. But I can't tell what > integration test is causing the failure. > Relevant parts of the logs: > {noformat} > 11:19:01 BUILD FAILED in 36s > 11:19:01 > 11:19:01 2018-11-02 18:19:01,413 3c18295b MainThread > beam_integration_benchmark(1/1) ERRORError during benchmark > beam_integration_benchmark > 11:19:01 Traceback (most recent call last): > 11:19:01 File > "/home/jenkins/jenkins-slave/workspace/beam_PerformanceTests_Python/PerfKitBenchmarker/perfkitbenchmarker/pkb.py", > line 719, in RunBenchmark > 11:19:01 DoRunPhase(spec, collector, detailed_timer) > 11:19:01 File > "/home/jenkins/jenkins-slave/workspace/beam_PerformanceTests_Python/PerfKitBenchmarker/perfkitbenchmarker/pkb.py", > line 580, in DoRunPhase > 11:19:01 samples = spec.BenchmarkRun(spec) > 11:19:01 File > "/home/jenkins/jenkins-slave/workspace/beam_PerformanceTests_Python/PerfKitBenchmarker/perfkitbenchmarker/linux_benchmarks/beam_integration_benchmark.py", > line 160, in Run > 11:19:01 job_type=job_type) > 11:19:01 File > "/home/jenkins/jenkins-slave/workspace/beam_PerformanceTests_Python/PerfKitBenchmarker/perfkitbenchmarker/providers/gcp/gcp_dpb_dataflow.py", > line 90, in SubmitJob > 11:19:01 assert retcode == 0, "Integration Test Failed." > 11:19:01 AssertionError: Integration Test Failed. > 11:19:01 2018-11-02 18:19:01,414 3c18295b MainThread > beam_integration_benchmark(1/1) INFO Cleaning up benchmark > beam_integration_benchmark > 11:19:01 2018-11-02 18:19:01,416 3c18295b MainThread > beam_integration_benchmark(1/1) ERRORException running benchmark > 11:19:01 Traceback (most recent call last): > 11:19:01 File > "/home/jenkins/jenkins-slave/workspace/beam_PerformanceTests_Python/PerfKitBenchmarker/perfkitbenchmarker/pkb.py", > line 860, in RunBenchmarkTask > 11:19:01 RunBenchmark(spec, collector) > 11:19:01 File > "/home/jenkins/jenkins-slave/workspace/beam_PerformanceTests_Python/PerfKitBenchmarker/perfkitbenchmarker/pkb.py", > line 719, in RunBenchmark > 11:19:01 DoRunPhase(spec, collector, detailed_timer) > 11:19:01 File > "/home/jenkins/jenkins-slave/workspace/beam_PerformanceTests_Python/PerfKitBenchmarker/perfkitbenchmarker/pkb.py", > line 580, in DoRunPhase > 11:19:01 samples = spec.BenchmarkRun(spec) > 11:19:01 File > "/home/jenkins/jenkins-slave/workspace/beam_PerformanceTests_Python/PerfKitBenchmarker/perfkitbenchmarker/linux_benchmarks/beam_integration_benchmark.py", > line 160, in Run > 11:19:01 job_type=job_type) > 11:19:01 File > "/home/jenkins/jenkins-slave/workspace/beam_PerformanceTests_Python/PerfKitBenchmarker/perfkitbenchmarker/providers/gcp/gcp_dpb_dataflow.py", > line 90, in SubmitJob > 11:19:01 assert retcode == 0, "Integration Test Failed." > 11:19:01 AssertionError: Integration Test Failed. > 11:19:01 2018-11-02 18:19:01,416 3c18295b MainThread > beam_integration_benchmark(1/1) ERRORBenchmark 1/1 > beam_integration_benchmark (UID: beam_integration_benchmark0) failed. > Execution will continue. > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5921) [SQL] Support Joda types for UDF arguments
[ https://issues.apache.org/jira/browse/BEAM-5921?focusedWorklogId=162722=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162722 ] ASF GitHub Bot logged work on BEAM-5921: Author: ASF GitHub Bot Created on: 05/Nov/18 19:44 Start Date: 05/Nov/18 19:44 Worklog Time Spent: 10m Work Description: XuMingmin commented on a change in pull request #6913: [BEAM-5921] [SQL] Support Joda types for UDF arguments URL: https://github.com/apache/beam/pull/6913#discussion_r230886346 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java ## @@ -189,4 +192,19 @@ private static RelDataType toRelDataType( return dataTypeFactory.createTypeWithNullability(type, field.getNullable()); } + + /** + * SQL-Java type mapping, with specified Beam rules: + * 1. redirect {@link ReadableInstant} to {@link Date} so Calcite can recognize it. + * + * @param rawType + * @return + */ + public static RelDataType sqlTypeWithAutoCast(RelDataTypeFactory typeFactory, Type rawType) { +//For Joda time types, return SQL type for java.util.Date. +if (rawType instanceof Class && ReadableInstant.class.isAssignableFrom((Class) rawType)) { Review comment: Good points here, seems it's the inconsistent between Row` and `BeamSqlPrimitive`: 1. `Row` accepts `AbstractInstant` and always store/return an `Instant`; 2. `BeamSqlPrimitive` accepts `ReadableInstant` and return as it is; create https://issues.apache.org/jira/browse/BEAM-5976 to fix it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 162722) Time Spent: 3h 40m (was: 3.5h) > [SQL] Support Joda types for UDF arguments > -- > > Key: BEAM-5921 > URL: https://issues.apache.org/jira/browse/BEAM-5921 > Project: Beam > Issue Type: Bug > Components: dsl-sql >Reporter: Anton Kedin >Assignee: Xu Mingmin >Priority: Major > Fix For: 2.9.0 > > Time Spent: 3h 40m > Remaining Estimate: 0h > > We call ScalarFunctionImpl.create() to register a UDF with Calcite schema in > BeamSqlEnv. Internally it uses Calcite's internal mapping > (JavaToSqlTypeConversionRules) to map Java types to SQL types to create a > function signature that gets registered in the schema. Problem is that this > logic is not extensible and doesn't include Joda types support (maybe others > as well). > We can work around this by constructing our own subclass of Function that > gets registered in the schema instead of calling ScalarFunctionImpl.create(). > This logic can use our own custom mapping (or fall back to Calcite > implementation if needed). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-5976) use AbstractInstant as DATEITME type in functions
Xu Mingmin created BEAM-5976: Summary: use AbstractInstant as DATEITME type in functions Key: BEAM-5976 URL: https://issues.apache.org/jira/browse/BEAM-5976 Project: Beam Issue Type: Improvement Components: dsl-sql Reporter: Xu Mingmin Assignee: Xu Mingmin refer to discussion in [https://github.com/apache/beam/pull/6913#discussion_r230148526] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5974) Migrate ByteKeyRangeTracker to use tryClaim(ByteKey.EMPTY) as end of range claim instead of markDone
[ https://issues.apache.org/jira/browse/BEAM-5974?focusedWorklogId=162716=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162716 ] ASF GitHub Bot logged work on BEAM-5974: Author: ASF GitHub Bot Created on: 05/Nov/18 19:34 Start Date: 05/Nov/18 19:34 Worklog Time Spent: 10m Work Description: lukecwik edited a comment on issue #6949: [BEAM-5974] Fix ByteKeyRangeTracker to handle tryClaim(ByteKey.EMPTY) instead of exposing markDone URL: https://github.com/apache/beam/pull/6949#issuecomment-436005177 R: @iemejia @swegner CC: @jbonofre This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 162716) Time Spent: 0.5h (was: 20m) > Migrate ByteKeyRangeTracker to use tryClaim(ByteKey.EMPTY) as end of range > claim instead of markDone > > > Key: BEAM-5974 > URL: https://issues.apache.org/jira/browse/BEAM-5974 > Project: Beam > Issue Type: Bug > Components: io-java-hbase, sdk-java-core >Affects Versions: 2.8.0 >Reporter: Luke Cwik >Assignee: Luke Cwik >Priority: Major > Time Spent: 0.5h > Remaining Estimate: 0h > > org.apache.beam.sdk.transforms.splittabledofn.ByteKeyRangeTracker doesn't > handle tryClaim(ByteKey.EMPTY) and the related doneness check or > checkpointing since doneness checking can't handle the empty interval, > checkpointing returns invalid checkpoints or errors out since it is using the > lastClaimedKey vs the lastAttemptedKey for doneness checking -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5974) Migrate ByteKeyRangeTracker to use tryClaim(ByteKey.EMPTY) as end of range claim instead of markDone
[ https://issues.apache.org/jira/browse/BEAM-5974?focusedWorklogId=162714=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162714 ] ASF GitHub Bot logged work on BEAM-5974: Author: ASF GitHub Bot Created on: 05/Nov/18 19:31 Start Date: 05/Nov/18 19:31 Worklog Time Spent: 10m Work Description: lukecwik opened a new pull request #6949: [BEAM-5974] Fix ByteKeyRangeTracker to handle tryClaim(ByteKey.EMPTY) instead of exposing markDone URL: https://github.com/apache/beam/pull/6949 Add support for tryClaim(ByteKey.EMPTY) and fix doneness checking and also returning checkpoints for restrictions that are unstarted or done. Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). It will help us expedite review of your Pull Request if you tag someone (e.g. `@username`) to look at it. Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/) | --- | --- | --- | --- | --- | --- Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/) [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/) [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/) | --- | --- | --- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 162714) Time Spent: 10m Remaining Estimate: 0h
[jira] [Work logged] (BEAM-5974) Migrate ByteKeyRangeTracker to use tryClaim(ByteKey.EMPTY) as end of range claim instead of markDone
[ https://issues.apache.org/jira/browse/BEAM-5974?focusedWorklogId=162715=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162715 ] ASF GitHub Bot logged work on BEAM-5974: Author: ASF GitHub Bot Created on: 05/Nov/18 19:31 Start Date: 05/Nov/18 19:31 Worklog Time Spent: 10m Work Description: lukecwik commented on issue #6949: [BEAM-5974] Fix ByteKeyRangeTracker to handle tryClaim(ByteKey.EMPTY) instead of exposing markDone URL: https://github.com/apache/beam/pull/6949#issuecomment-436005177 R: @iemejia @swegner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 162715) Time Spent: 20m (was: 10m) > Migrate ByteKeyRangeTracker to use tryClaim(ByteKey.EMPTY) as end of range > claim instead of markDone > > > Key: BEAM-5974 > URL: https://issues.apache.org/jira/browse/BEAM-5974 > Project: Beam > Issue Type: Bug > Components: io-java-hbase, sdk-java-core >Affects Versions: 2.8.0 >Reporter: Luke Cwik >Assignee: Luke Cwik >Priority: Major > Time Spent: 20m > Remaining Estimate: 0h > > org.apache.beam.sdk.transforms.splittabledofn.ByteKeyRangeTracker doesn't > handle tryClaim(ByteKey.EMPTY) and the related doneness check or > checkpointing since doneness checking can't handle the empty interval, > checkpointing returns invalid checkpoints or errors out since it is using the > lastClaimedKey vs the lastAttemptedKey for doneness checking -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5884) Allow nested types have null value.
[ https://issues.apache.org/jira/browse/BEAM-5884?focusedWorklogId=162712=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162712 ] ASF GitHub Bot logged work on BEAM-5884: Author: ASF GitHub Bot Created on: 05/Nov/18 19:27 Start Date: 05/Nov/18 19:27 Worklog Time Spent: 10m Work Description: kanterov edited a comment on issue #6861: [BEAM-5884] support null values in Array and Map. URL: https://github.com/apache/beam/pull/6861#issuecomment-436003580 > We should do this regardless, but I'm curious what the SQL standard and other engines support. @kennknowles Out of my head, limitations in Spark SQL, it doesn't support types like: `Map, ?>` `Record(field: Optional)>)` As I remember, the reason for records is they are stored in the flattened form, that requires expressing `Optional>` that doesn't work because a boolean flag is used. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 162712) Time Spent: 3.5h (was: 3h 20m) > Allow nested types have null value. > --- > > Key: BEAM-5884 > URL: https://issues.apache.org/jira/browse/BEAM-5884 > Project: Beam > Issue Type: Bug > Components: dsl-sql >Reporter: Rui Wang >Assignee: Rui Wang >Priority: Major > Time Spent: 3.5h > Remaining Estimate: 0h > > We could allow arbitrary combination of nested types have null value. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5884) Allow nested types have null value.
[ https://issues.apache.org/jira/browse/BEAM-5884?focusedWorklogId=162711=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162711 ] ASF GitHub Bot logged work on BEAM-5884: Author: ASF GitHub Bot Created on: 05/Nov/18 19:27 Start Date: 05/Nov/18 19:27 Worklog Time Spent: 10m Work Description: kanterov commented on issue #6861: [BEAM-5884] support null values in Array and Map. URL: https://github.com/apache/beam/pull/6861#issuecomment-436003580 > We should do this regardless, but I'm curious what the SQL standard and other engines support. @kennknowles Out of my head, limitations in Spark SQL, it doesn't support types like: `Map, ?>` `Record(field: Optional)>)` As I remember, the reason for records is they are stored in the flattened form, that requires expressing `Optional>` that doesn't work because a boolean flag is used. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 162711) Time Spent: 3h 20m (was: 3h 10m) > Allow nested types have null value. > --- > > Key: BEAM-5884 > URL: https://issues.apache.org/jira/browse/BEAM-5884 > Project: Beam > Issue Type: Bug > Components: dsl-sql >Reporter: Rui Wang >Assignee: Rui Wang >Priority: Major > Time Spent: 3h 20m > Remaining Estimate: 0h > > We could allow arbitrary combination of nested types have null value. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-5975) [beam_PostRelease_NightlySnapshot] Nightly has been failing due to errors relating to runMobileGamingJavaDirect
Daniel Oliveira created BEAM-5975: - Summary: [beam_PostRelease_NightlySnapshot] Nightly has been failing due to errors relating to runMobileGamingJavaDirect Key: BEAM-5975 URL: https://issues.apache.org/jira/browse/BEAM-5975 Project: Beam Issue Type: Bug Components: test-failures Reporter: Daniel Oliveira beam_PostRelease_NightlySnapshot has failed two nights in a row due to issues relating to runMobileGamingJavaDirect. It's possible that these are two separate bugs since the failures are different, but I am grouping them together because they happened on consecutive nights and very closely to each other in the test logs, so it seems likely they're related. [https://builds.apache.org/job/beam_PostRelease_NightlySnapshot/421/] In the first failure the test times out while executing runMobileGamingJavaDirect: {noformat} 04:40:03 Waiting on bqjob_r2bb92817cbd78f51_0166debcb7da_1 ... (0s) Current status: DONE 04:40:03 +---+ 04:40:03 | table_id | 04:40:03 +---+ 04:40:03 | hourly_team_score_python_dataflow | 04:40:03 | hourly_team_score_python_direct | 04:40:03 | leaderboard_DataflowRunner_team | 04:40:03 | leaderboard_DataflowRunner_user | 04:40:03 | leaderboard_DirectRunner_team | 04:40:03 | leaderboard_DirectRunner_user | 04:40:03 +---+ 04:40:06 bq query SELECT table_id FROM beam_postrelease_mobile_gaming.__TABLES_SUMMARY__ 04:40:06 Build timed out (after 100 minutes). Marking the build as aborted. 04:40:06 Build was aborted 04:40:06 04:40:08 Finished: ABORTED{noformat} [https://builds.apache.org/job/beam_PostRelease_NightlySnapshot/422/] In the next day's run the test fails due to an issue building a class while building runMobileGamingJavaDirect: {noformat} [INFO] --- maven-compiler-plugin:3.7.0:compile (default-compile) @ word-count-beam --- [INFO] Changes detected - recompiling the module! [WARNING] File encoding has not been set, using platform encoding UTF-8, i.e. build is platform dependent! [INFO] Compiling 31 source files to /tmp/groovy-generated-7468741840914398994-tmpdir/word-count-beam/target/classes [ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.6.0:java (default-cli) on project word-count-beam: An exception occured while executing the Java class. org.apache.beam.examples.complete.game.LeaderBoard -> [Help 1] [ERROR]{noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-5974) Migrate ByteKeyRangeTracker to use tryClaim(ByteKey.EMPTY) as end of range claim instead of markDone
Luke Cwik created BEAM-5974: --- Summary: Migrate ByteKeyRangeTracker to use tryClaim(ByteKey.EMPTY) as end of range claim instead of markDone Key: BEAM-5974 URL: https://issues.apache.org/jira/browse/BEAM-5974 Project: Beam Issue Type: Bug Components: io-java-hbase, sdk-java-core Affects Versions: 2.8.0 Reporter: Luke Cwik Assignee: Luke Cwik org.apache.beam.sdk.transforms.splittabledofn.ByteKeyRangeTracker doesn't handle tryClaim(ByteKey.EMPTY) and the related doneness check or checkpointing since doneness checking can't handle the empty interval, checkpointing returns invalid checkpoints or errors out since it is using the lastClaimedKey vs the lastAttemptedKey for doneness checking -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-5931) Rollback PR/6899
[ https://issues.apache.org/jira/browse/BEAM-5931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16675632#comment-16675632 ] Ruoyun Huang commented on BEAM-5931: Would like to ping in this thread about my question (https://bit.ly/2qui6Ot) in dev mailing. I saw Lukaz Gajowy is the person that made recent significant change to these Jenkins files. Would you please share your workflow on debug/test-run Jenkins files? I have been using command string in PR, but that was very inefficient. Really appreciate any suggestion on what the best way is. Thanks. > Rollback PR/6899 > > > Key: BEAM-5931 > URL: https://issues.apache.org/jira/browse/BEAM-5931 > Project: Beam > Issue Type: Task > Components: beam-model, runner-dataflow >Reporter: Luke Cwik >Assignee: Ruoyun Huang >Priority: Major > Time Spent: 2h > Remaining Estimate: 0h > > To rollback this change, one must either: > 1) Update nexmark / perf test framework to use Dataflow worker jar. > This requires adding the > {code} > "--dataflowWorkerJar=${dataflowWorkerJar}", > "--workerHarnessContainerImage=", > {code} > when running the tests. > OR > 2) Update the dataflow worker image with code that contains the rollback of > PR/6899 and then rollback PR/6899 in Github with the updated Dataflow worker > image. > #1 is preferable since we will no longer have tests running that don't use a > Dataflow worker jar built from Github HEAD. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5918) Add Cast transform for Rows
[ https://issues.apache.org/jira/browse/BEAM-5918?focusedWorklogId=162705=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162705 ] ASF GitHub Bot logged work on BEAM-5918: Author: ASF GitHub Bot Created on: 05/Nov/18 19:07 Start Date: 05/Nov/18 19:07 Worklog Time Spent: 10m Work Description: kanterov commented on a change in pull request #6888: [BEAM-5918] Add Cast transform for Rows URL: https://github.com/apache/beam/pull/6888#discussion_r230873757 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Cast.java ## @@ -0,0 +1,440 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.schemas.transforms; + +import com.google.auto.value.AutoValue; +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Maps; +import java.io.Serializable; +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.schemas.FieldAccessDescriptor; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.Field; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.schemas.Schema.TypeName; +import org.apache.beam.sdk.schemas.utils.SchemaZipFold; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; + +/** Set of utilities for casting rows between schemas. */ +@Experimental(Experimental.Kind.SCHEMAS) +@AutoValue +public abstract class Cast extends PTransform, PCollection> { + + public abstract Schema outputSchema(); + + public abstract Validator validator(); + + public static Cast of(Schema outputSchema, Validator validator) { +return new AutoValue_Cast<>(outputSchema, validator); + } + + public static Cast widening(Schema outputSchema) { +return new AutoValue_Cast<>(outputSchema, Widening.of()); + } + + public static Cast narrowing(Schema outputSchema) { +return new AutoValue_Cast<>(outputSchema, Narrowing.of()); + } + + /** Describes compatibility errors during casting. */ + @AutoValue + public abstract static class CompatibilityError implements Serializable { + +public abstract List path(); + +public abstract String message(); + +public static CompatibilityError create(List path, String message) { + return new AutoValue_Cast_CompatibilityError(path, message); +} + } + + /** Interface for statically validating casts. */ + public interface Validator extends Serializable { +List apply(Schema input, Schema output); + } + + /** + * Widening changes to type that can represent any possible value of the original type. + * + * Standard widening conversions: + * + * + * BYTE to INT16, INT32, INT64, FLOAT, DOUBLE, DECIMAL + * INT16 to INT32, INT64, FLOAT, DOUBLE, DECIMAL + * INT32 to INT64, FLOAT, DOUBLE, DECIMAL + * INT64 to FLOAT, DOUBLE, DECIMAL + * FLOAT to DOUBLE, DECIMAL + * DOUBLE to DECIMAL + * + * + * Row widening: + * + * + * wider schema to schema with a subset of fields + * non-nullable fields to nullable fields + * + * + * Widening doesn't lose information about the overall magnitude in following cases: + * + * + * integral type to another integral type + * BYTE or INT16 to FLOAT, DOUBLE or DECIMAL + * INT32 to DOUBLE + * + * + * Other conversions to may cause loss of precision. + */ + public static class Widening implements Validator { +private final Fold fold = new Fold(); + +public static Widening of() { + return new Widening(); +} + +@Override +public String toString() { + return "Cast.Widening"; +} + +@Override +public List apply(final Schema input, final
[jira] [Created] (BEAM-5973) [Flake] Various ValidatesRunner Post-commits flaking due to quota issues.
Daniel Oliveira created BEAM-5973: - Summary: [Flake] Various ValidatesRunner Post-commits flaking due to quota issues. Key: BEAM-5973 URL: https://issues.apache.org/jira/browse/BEAM-5973 Project: Beam Issue Type: Bug Components: test-failures Reporter: Daniel Oliveira Multiple post-commits all seem to have failed at the same time due to extremely similar GCP errors: beam_PostCommit_Java_GradleBuild: [https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/1822/] Several tests fail with one of the two following errors: {noformat} Nov 04, 2018 6:40:14 PM org.apache.beam.runners.dataflow.TestDataflowRunner$ErrorMonitorMessagesHandler process INFO: Dataflow job 2018-11-04_10_37_12-7420261977214120411 threw exception. Failure message was: Startup of the worker pool in zone us-central1-b failed to bring up any of the desired 1 workers. QUOTA_EXCEEDED: Quota 'DISKS_TOTAL_GB' exceeded. Limit: 20.0 in region us-central1.{noformat} {noformat} Nov 04, 2018 6:39:14 PM org.apache.beam.runners.dataflow.TestDataflowRunner$ErrorMonitorMessagesHandler process INFO: Dataflow job 2018-11-04_10_37_11-14433481609734431843 threw exception. Failure message was: Startup of the worker pool in zone us-central1-b failed to bring up any of the desired 1 workers. QUOTA_EXCEEDED: Quota 'CPUS' exceeded. Limit: 750.0 in region us-central1. {noformat} beam_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow_Gradle: [https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow_Gradle/31/] Test failures include the errors pasted above, plus one new one: {noformat} Nov 04, 2018 6:38:13 PM org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process SEVERE: 2018-11-04T18:38:04.612Z: Workflow failed. Causes: Project apache-beam-testing has insufficient quota(s) to execute this workflow with 1 instances in region us-central1. Quota summary (required/available): 1/7192 instances, 1/202 CPUs, 250/121 disk GB, 0/4046 SSD disk GB, 1/267 instance groups, 1/267 managed instance groups, 1/242 instance templates, 1/446 in-use IP addresses.{noformat} beam_PostCommit_Java_PVR_Flink: [https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink/214/] The error appears differently but is caused by a lack of memory, so it seems related to the quota issues above. {noformat} Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x0003acd8, 6654787584, 0) failed; error='Cannot allocate memory' (errno=12) # # There is insufficient memory for the Java Runtime Environment to continue. # Native memory allocation (mmap) failed to map 6654787584 bytes for committing reserved memory.{noformat} Project beam_PostCommit_Java_ValidatesRunner_Flink_Gradle:[https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/2101/] I couldn't find a visible error with the failure in this job, but I'm grouping it together with the other failures due to it flaking at the same time as the other Flink VR Post-commit. I may be grouping these failures a bit too aggressively. If anyone believes that the failures are caused by different reasons please split this into multiple bugs. A possibility is that these errors are caused by us running all our post-commits at the same time, causing resources to be used up in bursts. Maybe if we stagger our post-commits some of these quota issues could be avoided. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4444) Parquet IO for Python SDK
[ https://issues.apache.org/jira/browse/BEAM-?focusedWorklogId=162685=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162685 ] ASF GitHub Bot logged work on BEAM-: Author: ASF GitHub Bot Created on: 05/Nov/18 18:51 Start Date: 05/Nov/18 18:51 Worklog Time Spent: 10m Work Description: chamikaramj commented on a change in pull request #6763: [BEAM-] Parquet IO for Python SDK URL: https://github.com/apache/beam/pull/6763#discussion_r230856675 ## File path: sdks/python/apache_beam/io/parquetio.py ## @@ -0,0 +1,349 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""``PTransforms`` for reading from and writing to Parquet files. + +Provides two read ``PTransform``s, ``ReadFromParquet`` and +``ReadAllFromParquet``, that produces a ``PCollection`` of records. +Each record of this ``PCollection`` will contain a single record read from +a Parquet file. Records that are of simple types will be mapped into +corresponding Python types. The actual parquet file operations are done by +pyarrow. Source splitting is supported at row group granularity. + +Additionally, this module provides a write ``PTransform`` ``WriteToParquet`` +that can be used to write a given ``PCollection`` of Python objects to a +Parquet file. +""" +from __future__ import absolute_import + +from functools import partial + +import pyarrow as pa +from pyarrow.parquet import ParquetFile +from pyarrow.parquet import ParquetWriter + +from apache_beam.io import filebasedsink +from apache_beam.io import filebasedsource +from apache_beam.io.filesystem import CompressionTypes +from apache_beam.io.iobase import RangeTracker +from apache_beam.io.iobase import Read +from apache_beam.io.iobase import Write +from apache_beam.transforms import PTransform + +__all__ = ['ReadFromParquet', 'ReadAllFromParquet', 'WriteToParquet'] + + +class ReadFromParquet(PTransform): + """A :class:`~apache_beam.transforms.ptransform.PTransform` for reading + Parquet files.""" + + def __init__(self, file_pattern=None, min_bundle_size=0, + validate=True, columns=None): +"""Initialize :class:`ReadFromParquet`. +""" +super(ReadFromParquet, self).__init__() +self._source = _create_parquet_source( +file_pattern, +min_bundle_size, +validate=validate, +columns=columns +) + + def expand(self, pvalue): +return pvalue.pipeline | Read(self._source) + + def display_data(self): +return {'source_dd': self._source} + + +class ReadAllFromParquet(PTransform): + """A ``PTransform`` for reading ``PCollection`` of Parquet files. + + Uses source '_ParquetSource' to read a ``PCollection`` of Parquet files or + file patterns and produce a ``PCollection`` of Parquet records. + """ + + DEFAULT_DESIRED_BUNDLE_SIZE = 64 * 1024 * 1024 # 64MB + + def __init__(self, min_bundle_size=0, + desired_bundle_size=DEFAULT_DESIRED_BUNDLE_SIZE, + columns=None, + label='ReadAllFiles'): +"""Initializes ``ReadAllFromParquet``. + +Args: + min_bundle_size: the minimum size in bytes, to be considered when + splitting the input into bundles. + desired_bundle_size: the desired size in bytes, to be considered when + splitting the input into bundles. + columns: list of columns that will be read from files. A column name + may be a prefix of a nested field, e.g. 'a' will select + 'a.b', 'a.c', and 'a.d.e' +""" +super(ReadAllFromParquet, self).__init__() +source_from_file = partial( +_create_parquet_source, +min_bundle_size=min_bundle_size, +columns=columns +) +self._read_all_files = filebasedsource.ReadAllFiles( +True, CompressionTypes.AUTO, desired_bundle_size, min_bundle_size, +source_from_file) + +self.label = label + + def expand(self, pvalue): +return pvalue | self.label >> self._read_all_files + + +def _create_parquet_source(file_pattern=None, + min_bundle_size=None, +
[jira] [Work logged] (BEAM-4444) Parquet IO for Python SDK
[ https://issues.apache.org/jira/browse/BEAM-?focusedWorklogId=162683=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162683 ] ASF GitHub Bot logged work on BEAM-: Author: ASF GitHub Bot Created on: 05/Nov/18 18:51 Start Date: 05/Nov/18 18:51 Worklog Time Spent: 10m Work Description: chamikaramj commented on a change in pull request #6763: [BEAM-] Parquet IO for Python SDK URL: https://github.com/apache/beam/pull/6763#discussion_r230849365 ## File path: sdks/python/apache_beam/io/parquetio.py ## @@ -0,0 +1,349 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""``PTransforms`` for reading from and writing to Parquet files. + +Provides two read ``PTransform``s, ``ReadFromParquet`` and +``ReadAllFromParquet``, that produces a ``PCollection`` of records. +Each record of this ``PCollection`` will contain a single record read from +a Parquet file. Records that are of simple types will be mapped into +corresponding Python types. The actual parquet file operations are done by +pyarrow. Source splitting is supported at row group granularity. + +Additionally, this module provides a write ``PTransform`` ``WriteToParquet`` +that can be used to write a given ``PCollection`` of Python objects to a +Parquet file. +""" +from __future__ import absolute_import + +from functools import partial + +import pyarrow as pa +from pyarrow.parquet import ParquetFile +from pyarrow.parquet import ParquetWriter + +from apache_beam.io import filebasedsink +from apache_beam.io import filebasedsource +from apache_beam.io.filesystem import CompressionTypes +from apache_beam.io.iobase import RangeTracker +from apache_beam.io.iobase import Read +from apache_beam.io.iobase import Write +from apache_beam.transforms import PTransform + +__all__ = ['ReadFromParquet', 'ReadAllFromParquet', 'WriteToParquet'] + + +class ReadFromParquet(PTransform): + """A :class:`~apache_beam.transforms.ptransform.PTransform` for reading + Parquet files.""" + + def __init__(self, file_pattern=None, min_bundle_size=0, + validate=True, columns=None): +"""Initialize :class:`ReadFromParquet`. +""" +super(ReadFromParquet, self).__init__() +self._source = _create_parquet_source( +file_pattern, +min_bundle_size, +validate=validate, +columns=columns +) + + def expand(self, pvalue): +return pvalue.pipeline | Read(self._source) + + def display_data(self): +return {'source_dd': self._source} + + +class ReadAllFromParquet(PTransform): + """A ``PTransform`` for reading ``PCollection`` of Parquet files. + + Uses source '_ParquetSource' to read a ``PCollection`` of Parquet files or + file patterns and produce a ``PCollection`` of Parquet records. + """ + + DEFAULT_DESIRED_BUNDLE_SIZE = 64 * 1024 * 1024 # 64MB + + def __init__(self, min_bundle_size=0, + desired_bundle_size=DEFAULT_DESIRED_BUNDLE_SIZE, + columns=None, + label='ReadAllFiles'): +"""Initializes ``ReadAllFromParquet``. + +Args: + min_bundle_size: the minimum size in bytes, to be considered when + splitting the input into bundles. + desired_bundle_size: the desired size in bytes, to be considered when + splitting the input into bundles. + columns: list of columns that will be read from files. A column name + may be a prefix of a nested field, e.g. 'a' will select + 'a.b', 'a.c', and 'a.d.e' +""" +super(ReadAllFromParquet, self).__init__() +source_from_file = partial( +_create_parquet_source, +min_bundle_size=min_bundle_size, +columns=columns +) +self._read_all_files = filebasedsource.ReadAllFiles( +True, CompressionTypes.AUTO, desired_bundle_size, min_bundle_size, Review comment: Does parquet support all compression types mentioned in CompressionTypes ? If not we should explicitly mention the types that are supported. This is an automated message from
[jira] [Work logged] (BEAM-5918) Add Cast transform for Rows
[ https://issues.apache.org/jira/browse/BEAM-5918?focusedWorklogId=162697=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162697 ] ASF GitHub Bot logged work on BEAM-5918: Author: ASF GitHub Bot Created on: 05/Nov/18 18:52 Start Date: 05/Nov/18 18:52 Worklog Time Spent: 10m Work Description: kanterov commented on a change in pull request #6888: [BEAM-5918] Add Cast transform for Rows URL: https://github.com/apache/beam/pull/6888#discussion_r230868788 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SchemaZipFold.java ## @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.schemas.utils; + +import com.google.auto.value.AutoValue; +import com.google.common.collect.ImmutableList; +import java.io.Serializable; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.stream.Stream; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.Field; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.schemas.Schema.TypeName; + +/** + * Visitor that zips schemas, and accepts pairs of fields and their types. + * + * Values returned by `accept` are accumulated. + */ +public abstract class SchemaZipFold implements Serializable { Review comment: It isn't optimized to have few allocations or be fast, because it's called a couple of times during graph construction, and, in my understanding, performance isn't a concern here. The reason why I extracted zip and fold is that otherwise, it's hard to see the actual narrowing/widening logic. I agree that it is a bit out of the rest of codebase. One alternative could be creating something like `class ZippedSchemas` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 162697) Time Spent: 4h 40m (was: 4.5h) > Add Cast transform for Rows > --- > > Key: BEAM-5918 > URL: https://issues.apache.org/jira/browse/BEAM-5918 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core >Reporter: Gleb Kanterov >Assignee: Gleb Kanterov >Priority: Major > Time Spent: 4h 40m > Remaining Estimate: 0h > > There is a need for a generic transform that given two Row schemas will > convert rows between them. There must be a possibility to opt-out from > certain kind of conversions, for instance, converting ints to shorts can > cause overflow. Another example, a schema could have a nullable field, but > never have NULL value in practice, because it was filtered out. > What is needed: > - widening values (e.g., int -> long) > - narrowwing (e.g., int -> short) > - runtime check for overflow while narrowing > - ignoring nullability (nullable=true -> nullable=false) > - weakening nullability (nullable=false -> nullable=true) > - projection (Schema(a: Int32, b: Int32) -> Schema(a: Int32)) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4444) Parquet IO for Python SDK
[ https://issues.apache.org/jira/browse/BEAM-?focusedWorklogId=162687=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162687 ] ASF GitHub Bot logged work on BEAM-: Author: ASF GitHub Bot Created on: 05/Nov/18 18:51 Start Date: 05/Nov/18 18:51 Worklog Time Spent: 10m Work Description: chamikaramj commented on a change in pull request #6763: [BEAM-] Parquet IO for Python SDK URL: https://github.com/apache/beam/pull/6763#discussion_r230848976 ## File path: sdks/python/apache_beam/io/parquetio.py ## @@ -0,0 +1,349 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""``PTransforms`` for reading from and writing to Parquet files. + +Provides two read ``PTransform``s, ``ReadFromParquet`` and +``ReadAllFromParquet``, that produces a ``PCollection`` of records. +Each record of this ``PCollection`` will contain a single record read from +a Parquet file. Records that are of simple types will be mapped into +corresponding Python types. The actual parquet file operations are done by +pyarrow. Source splitting is supported at row group granularity. + +Additionally, this module provides a write ``PTransform`` ``WriteToParquet`` +that can be used to write a given ``PCollection`` of Python objects to a +Parquet file. +""" +from __future__ import absolute_import + +from functools import partial + +import pyarrow as pa +from pyarrow.parquet import ParquetFile +from pyarrow.parquet import ParquetWriter + +from apache_beam.io import filebasedsink +from apache_beam.io import filebasedsource +from apache_beam.io.filesystem import CompressionTypes +from apache_beam.io.iobase import RangeTracker +from apache_beam.io.iobase import Read +from apache_beam.io.iobase import Write +from apache_beam.transforms import PTransform + +__all__ = ['ReadFromParquet', 'ReadAllFromParquet', 'WriteToParquet'] + + +class ReadFromParquet(PTransform): + """A :class:`~apache_beam.transforms.ptransform.PTransform` for reading + Parquet files.""" + + def __init__(self, file_pattern=None, min_bundle_size=0, + validate=True, columns=None): +"""Initialize :class:`ReadFromParquet`. +""" +super(ReadFromParquet, self).__init__() +self._source = _create_parquet_source( +file_pattern, +min_bundle_size, +validate=validate, +columns=columns +) + + def expand(self, pvalue): +return pvalue.pipeline | Read(self._source) + + def display_data(self): +return {'source_dd': self._source} + + +class ReadAllFromParquet(PTransform): + """A ``PTransform`` for reading ``PCollection`` of Parquet files. + + Uses source '_ParquetSource' to read a ``PCollection`` of Parquet files or + file patterns and produce a ``PCollection`` of Parquet records. + """ + + DEFAULT_DESIRED_BUNDLE_SIZE = 64 * 1024 * 1024 # 64MB + + def __init__(self, min_bundle_size=0, + desired_bundle_size=DEFAULT_DESIRED_BUNDLE_SIZE, + columns=None, + label='ReadAllFiles'): +"""Initializes ``ReadAllFromParquet``. + +Args: + min_bundle_size: the minimum size in bytes, to be considered when Review comment: This is used when splitting the BoundedSource I believe. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 162687) Time Spent: 4h 40m (was: 4.5h) > Parquet IO for Python SDK > - > > Key: BEAM- > URL: https://issues.apache.org/jira/browse/BEAM- > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: Bruce Arctor >Assignee: Heejong Lee >Priority: Major > Time Spent: 4h 40m > Remaining Estimate: 0h > > Add Parquet Support for the Python SDK. -- This message was sent
[jira] [Work logged] (BEAM-4444) Parquet IO for Python SDK
[ https://issues.apache.org/jira/browse/BEAM-?focusedWorklogId=162692=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162692 ] ASF GitHub Bot logged work on BEAM-: Author: ASF GitHub Bot Created on: 05/Nov/18 18:51 Start Date: 05/Nov/18 18:51 Worklog Time Spent: 10m Work Description: chamikaramj commented on a change in pull request #6763: [BEAM-] Parquet IO for Python SDK URL: https://github.com/apache/beam/pull/6763#discussion_r230863621 ## File path: sdks/python/apache_beam/io/parquetio.py ## @@ -0,0 +1,349 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""``PTransforms`` for reading from and writing to Parquet files. + +Provides two read ``PTransform``s, ``ReadFromParquet`` and +``ReadAllFromParquet``, that produces a ``PCollection`` of records. +Each record of this ``PCollection`` will contain a single record read from +a Parquet file. Records that are of simple types will be mapped into +corresponding Python types. The actual parquet file operations are done by +pyarrow. Source splitting is supported at row group granularity. + +Additionally, this module provides a write ``PTransform`` ``WriteToParquet`` +that can be used to write a given ``PCollection`` of Python objects to a +Parquet file. +""" +from __future__ import absolute_import + +from functools import partial + +import pyarrow as pa +from pyarrow.parquet import ParquetFile +from pyarrow.parquet import ParquetWriter + +from apache_beam.io import filebasedsink +from apache_beam.io import filebasedsource +from apache_beam.io.filesystem import CompressionTypes +from apache_beam.io.iobase import RangeTracker +from apache_beam.io.iobase import Read +from apache_beam.io.iobase import Write +from apache_beam.transforms import PTransform + +__all__ = ['ReadFromParquet', 'ReadAllFromParquet', 'WriteToParquet'] + + +class ReadFromParquet(PTransform): + """A :class:`~apache_beam.transforms.ptransform.PTransform` for reading + Parquet files.""" + + def __init__(self, file_pattern=None, min_bundle_size=0, + validate=True, columns=None): +"""Initialize :class:`ReadFromParquet`. +""" +super(ReadFromParquet, self).__init__() +self._source = _create_parquet_source( +file_pattern, +min_bundle_size, +validate=validate, +columns=columns +) + + def expand(self, pvalue): +return pvalue.pipeline | Read(self._source) + + def display_data(self): +return {'source_dd': self._source} + + +class ReadAllFromParquet(PTransform): + """A ``PTransform`` for reading ``PCollection`` of Parquet files. + + Uses source '_ParquetSource' to read a ``PCollection`` of Parquet files or + file patterns and produce a ``PCollection`` of Parquet records. + """ + + DEFAULT_DESIRED_BUNDLE_SIZE = 64 * 1024 * 1024 # 64MB + + def __init__(self, min_bundle_size=0, + desired_bundle_size=DEFAULT_DESIRED_BUNDLE_SIZE, + columns=None, + label='ReadAllFiles'): +"""Initializes ``ReadAllFromParquet``. + +Args: + min_bundle_size: the minimum size in bytes, to be considered when + splitting the input into bundles. + desired_bundle_size: the desired size in bytes, to be considered when + splitting the input into bundles. + columns: list of columns that will be read from files. A column name + may be a prefix of a nested field, e.g. 'a' will select + 'a.b', 'a.c', and 'a.d.e' +""" +super(ReadAllFromParquet, self).__init__() +source_from_file = partial( +_create_parquet_source, +min_bundle_size=min_bundle_size, +columns=columns +) +self._read_all_files = filebasedsource.ReadAllFiles( +True, CompressionTypes.AUTO, desired_bundle_size, min_bundle_size, +source_from_file) + +self.label = label + + def expand(self, pvalue): +return pvalue | self.label >> self._read_all_files + + +def _create_parquet_source(file_pattern=None, + min_bundle_size=None, +
[jira] [Work logged] (BEAM-4444) Parquet IO for Python SDK
[ https://issues.apache.org/jira/browse/BEAM-?focusedWorklogId=162694=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162694 ] ASF GitHub Bot logged work on BEAM-: Author: ASF GitHub Bot Created on: 05/Nov/18 18:51 Start Date: 05/Nov/18 18:51 Worklog Time Spent: 10m Work Description: chamikaramj commented on a change in pull request #6763: [BEAM-] Parquet IO for Python SDK URL: https://github.com/apache/beam/pull/6763#discussion_r230856940 ## File path: sdks/python/apache_beam/io/parquetio.py ## @@ -0,0 +1,349 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""``PTransforms`` for reading from and writing to Parquet files. + +Provides two read ``PTransform``s, ``ReadFromParquet`` and +``ReadAllFromParquet``, that produces a ``PCollection`` of records. +Each record of this ``PCollection`` will contain a single record read from +a Parquet file. Records that are of simple types will be mapped into +corresponding Python types. The actual parquet file operations are done by +pyarrow. Source splitting is supported at row group granularity. + +Additionally, this module provides a write ``PTransform`` ``WriteToParquet`` +that can be used to write a given ``PCollection`` of Python objects to a +Parquet file. +""" +from __future__ import absolute_import + +from functools import partial + +import pyarrow as pa +from pyarrow.parquet import ParquetFile +from pyarrow.parquet import ParquetWriter + +from apache_beam.io import filebasedsink +from apache_beam.io import filebasedsource +from apache_beam.io.filesystem import CompressionTypes +from apache_beam.io.iobase import RangeTracker +from apache_beam.io.iobase import Read +from apache_beam.io.iobase import Write +from apache_beam.transforms import PTransform + +__all__ = ['ReadFromParquet', 'ReadAllFromParquet', 'WriteToParquet'] + + +class ReadFromParquet(PTransform): + """A :class:`~apache_beam.transforms.ptransform.PTransform` for reading + Parquet files.""" + + def __init__(self, file_pattern=None, min_bundle_size=0, + validate=True, columns=None): +"""Initialize :class:`ReadFromParquet`. +""" +super(ReadFromParquet, self).__init__() +self._source = _create_parquet_source( +file_pattern, +min_bundle_size, +validate=validate, +columns=columns +) + + def expand(self, pvalue): +return pvalue.pipeline | Read(self._source) + + def display_data(self): +return {'source_dd': self._source} + + +class ReadAllFromParquet(PTransform): + """A ``PTransform`` for reading ``PCollection`` of Parquet files. + + Uses source '_ParquetSource' to read a ``PCollection`` of Parquet files or + file patterns and produce a ``PCollection`` of Parquet records. + """ + + DEFAULT_DESIRED_BUNDLE_SIZE = 64 * 1024 * 1024 # 64MB + + def __init__(self, min_bundle_size=0, + desired_bundle_size=DEFAULT_DESIRED_BUNDLE_SIZE, + columns=None, + label='ReadAllFiles'): +"""Initializes ``ReadAllFromParquet``. + +Args: + min_bundle_size: the minimum size in bytes, to be considered when + splitting the input into bundles. + desired_bundle_size: the desired size in bytes, to be considered when + splitting the input into bundles. + columns: list of columns that will be read from files. A column name + may be a prefix of a nested field, e.g. 'a' will select + 'a.b', 'a.c', and 'a.d.e' +""" +super(ReadAllFromParquet, self).__init__() +source_from_file = partial( +_create_parquet_source, +min_bundle_size=min_bundle_size, +columns=columns +) +self._read_all_files = filebasedsource.ReadAllFiles( +True, CompressionTypes.AUTO, desired_bundle_size, min_bundle_size, +source_from_file) + +self.label = label + + def expand(self, pvalue): +return pvalue | self.label >> self._read_all_files + + +def _create_parquet_source(file_pattern=None, + min_bundle_size=None, +
[jira] [Commented] (BEAM-5967) ProtoCoder doesn't support DynamicMessage
[ https://issues.apache.org/jira/browse/BEAM-5967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16675590#comment-16675590 ] Alex Van Boxel commented on BEAM-5967: -- Not the subtle difference: DynamicMessage takes an argument to getDefaultInstance, because DynamicMessage doesn't have static information about it's descriptor it needs that argument. Normal Messages are generated with all it's type information embedded (so it doesn't have an argument). > ProtoCoder doesn't support DynamicMessage > - > > Key: BEAM-5967 > URL: https://issues.apache.org/jira/browse/BEAM-5967 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core >Affects Versions: 2.8.0 >Reporter: Alex Van Boxel >Priority: Major > > The ProtoCoder does make some assumptions about static messages being > available. The DynamicMessage doesn't have some of them, mainly because the > proto schema is defined at runtime and not at compile time. > Does it make sense to make a special coder for DynamicMessage or build it > into the normal ProtoCoder. > Here is an example of the assumtion being made in the current Codec: > {code:java} > try { > @SuppressWarnings("unchecked") > T protoMessageInstance = (T) > protoMessageClass.getMethod("getDefaultInstance").invoke(null); > @SuppressWarnings("unchecked") > Parser tParser = (Parser) protoMessageInstance.getParserForType(); > memoizedParser = tParser; > } catch (IllegalAccessException | InvocationTargetException | > NoSuchMethodException e) { > throw new IllegalArgumentException(e); > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4444) Parquet IO for Python SDK
[ https://issues.apache.org/jira/browse/BEAM-?focusedWorklogId=162690=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162690 ] ASF GitHub Bot logged work on BEAM-: Author: ASF GitHub Bot Created on: 05/Nov/18 18:51 Start Date: 05/Nov/18 18:51 Worklog Time Spent: 10m Work Description: chamikaramj commented on a change in pull request #6763: [BEAM-] Parquet IO for Python SDK URL: https://github.com/apache/beam/pull/6763#discussion_r230850626 ## File path: sdks/python/apache_beam/io/parquetio.py ## @@ -0,0 +1,349 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""``PTransforms`` for reading from and writing to Parquet files. + +Provides two read ``PTransform``s, ``ReadFromParquet`` and +``ReadAllFromParquet``, that produces a ``PCollection`` of records. +Each record of this ``PCollection`` will contain a single record read from +a Parquet file. Records that are of simple types will be mapped into +corresponding Python types. The actual parquet file operations are done by +pyarrow. Source splitting is supported at row group granularity. + +Additionally, this module provides a write ``PTransform`` ``WriteToParquet`` +that can be used to write a given ``PCollection`` of Python objects to a +Parquet file. +""" +from __future__ import absolute_import + +from functools import partial + +import pyarrow as pa +from pyarrow.parquet import ParquetFile +from pyarrow.parquet import ParquetWriter + +from apache_beam.io import filebasedsink +from apache_beam.io import filebasedsource +from apache_beam.io.filesystem import CompressionTypes +from apache_beam.io.iobase import RangeTracker +from apache_beam.io.iobase import Read +from apache_beam.io.iobase import Write +from apache_beam.transforms import PTransform + +__all__ = ['ReadFromParquet', 'ReadAllFromParquet', 'WriteToParquet'] Review comment: Probably mark this as experimental till we are bit confident (few minor releases) by adding following text to doc comment of PTransforms. "Experimental; no backwards-compatibility guarantees.". This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 162690) Time Spent: 5h (was: 4h 50m) > Parquet IO for Python SDK > - > > Key: BEAM- > URL: https://issues.apache.org/jira/browse/BEAM- > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: Bruce Arctor >Assignee: Heejong Lee >Priority: Major > Time Spent: 5h > Remaining Estimate: 0h > > Add Parquet Support for the Python SDK. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4444) Parquet IO for Python SDK
[ https://issues.apache.org/jira/browse/BEAM-?focusedWorklogId=162684=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162684 ] ASF GitHub Bot logged work on BEAM-: Author: ASF GitHub Bot Created on: 05/Nov/18 18:51 Start Date: 05/Nov/18 18:51 Worklog Time Spent: 10m Work Description: chamikaramj commented on a change in pull request #6763: [BEAM-] Parquet IO for Python SDK URL: https://github.com/apache/beam/pull/6763#discussion_r230855689 ## File path: sdks/python/apache_beam/io/parquetio.py ## @@ -0,0 +1,349 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""``PTransforms`` for reading from and writing to Parquet files. + +Provides two read ``PTransform``s, ``ReadFromParquet`` and +``ReadAllFromParquet``, that produces a ``PCollection`` of records. +Each record of this ``PCollection`` will contain a single record read from +a Parquet file. Records that are of simple types will be mapped into +corresponding Python types. The actual parquet file operations are done by +pyarrow. Source splitting is supported at row group granularity. + +Additionally, this module provides a write ``PTransform`` ``WriteToParquet`` +that can be used to write a given ``PCollection`` of Python objects to a +Parquet file. +""" +from __future__ import absolute_import + +from functools import partial + +import pyarrow as pa +from pyarrow.parquet import ParquetFile +from pyarrow.parquet import ParquetWriter + +from apache_beam.io import filebasedsink +from apache_beam.io import filebasedsource +from apache_beam.io.filesystem import CompressionTypes +from apache_beam.io.iobase import RangeTracker +from apache_beam.io.iobase import Read +from apache_beam.io.iobase import Write +from apache_beam.transforms import PTransform + +__all__ = ['ReadFromParquet', 'ReadAllFromParquet', 'WriteToParquet'] + + +class ReadFromParquet(PTransform): + """A :class:`~apache_beam.transforms.ptransform.PTransform` for reading + Parquet files.""" + + def __init__(self, file_pattern=None, min_bundle_size=0, + validate=True, columns=None): +"""Initialize :class:`ReadFromParquet`. +""" +super(ReadFromParquet, self).__init__() +self._source = _create_parquet_source( +file_pattern, +min_bundle_size, +validate=validate, +columns=columns +) + + def expand(self, pvalue): +return pvalue.pipeline | Read(self._source) + + def display_data(self): +return {'source_dd': self._source} + + +class ReadAllFromParquet(PTransform): + """A ``PTransform`` for reading ``PCollection`` of Parquet files. + + Uses source '_ParquetSource' to read a ``PCollection`` of Parquet files or + file patterns and produce a ``PCollection`` of Parquet records. + """ + + DEFAULT_DESIRED_BUNDLE_SIZE = 64 * 1024 * 1024 # 64MB + + def __init__(self, min_bundle_size=0, + desired_bundle_size=DEFAULT_DESIRED_BUNDLE_SIZE, + columns=None, + label='ReadAllFiles'): +"""Initializes ``ReadAllFromParquet``. + +Args: + min_bundle_size: the minimum size in bytes, to be considered when + splitting the input into bundles. + desired_bundle_size: the desired size in bytes, to be considered when + splitting the input into bundles. + columns: list of columns that will be read from files. A column name + may be a prefix of a nested field, e.g. 'a' will select + 'a.b', 'a.c', and 'a.d.e' +""" +super(ReadAllFromParquet, self).__init__() +source_from_file = partial( +_create_parquet_source, +min_bundle_size=min_bundle_size, +columns=columns +) +self._read_all_files = filebasedsource.ReadAllFiles( +True, CompressionTypes.AUTO, desired_bundle_size, min_bundle_size, +source_from_file) + +self.label = label + + def expand(self, pvalue): +return pvalue | self.label >> self._read_all_files + + +def _create_parquet_source(file_pattern=None, + min_bundle_size=None, +
[jira] [Work logged] (BEAM-4444) Parquet IO for Python SDK
[ https://issues.apache.org/jira/browse/BEAM-?focusedWorklogId=162691=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162691 ] ASF GitHub Bot logged work on BEAM-: Author: ASF GitHub Bot Created on: 05/Nov/18 18:51 Start Date: 05/Nov/18 18:51 Worklog Time Spent: 10m Work Description: chamikaramj commented on a change in pull request #6763: [BEAM-] Parquet IO for Python SDK URL: https://github.com/apache/beam/pull/6763#discussion_r230857259 ## File path: sdks/python/apache_beam/io/parquetio.py ## @@ -0,0 +1,349 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""``PTransforms`` for reading from and writing to Parquet files. + +Provides two read ``PTransform``s, ``ReadFromParquet`` and +``ReadAllFromParquet``, that produces a ``PCollection`` of records. +Each record of this ``PCollection`` will contain a single record read from +a Parquet file. Records that are of simple types will be mapped into +corresponding Python types. The actual parquet file operations are done by +pyarrow. Source splitting is supported at row group granularity. + +Additionally, this module provides a write ``PTransform`` ``WriteToParquet`` +that can be used to write a given ``PCollection`` of Python objects to a +Parquet file. +""" +from __future__ import absolute_import + +from functools import partial + +import pyarrow as pa +from pyarrow.parquet import ParquetFile +from pyarrow.parquet import ParquetWriter + +from apache_beam.io import filebasedsink +from apache_beam.io import filebasedsource +from apache_beam.io.filesystem import CompressionTypes +from apache_beam.io.iobase import RangeTracker +from apache_beam.io.iobase import Read +from apache_beam.io.iobase import Write +from apache_beam.transforms import PTransform + +__all__ = ['ReadFromParquet', 'ReadAllFromParquet', 'WriteToParquet'] + + +class ReadFromParquet(PTransform): + """A :class:`~apache_beam.transforms.ptransform.PTransform` for reading + Parquet files.""" + + def __init__(self, file_pattern=None, min_bundle_size=0, + validate=True, columns=None): +"""Initialize :class:`ReadFromParquet`. +""" +super(ReadFromParquet, self).__init__() +self._source = _create_parquet_source( +file_pattern, +min_bundle_size, +validate=validate, +columns=columns +) + + def expand(self, pvalue): +return pvalue.pipeline | Read(self._source) + + def display_data(self): +return {'source_dd': self._source} + + +class ReadAllFromParquet(PTransform): + """A ``PTransform`` for reading ``PCollection`` of Parquet files. + + Uses source '_ParquetSource' to read a ``PCollection`` of Parquet files or + file patterns and produce a ``PCollection`` of Parquet records. + """ + + DEFAULT_DESIRED_BUNDLE_SIZE = 64 * 1024 * 1024 # 64MB + + def __init__(self, min_bundle_size=0, + desired_bundle_size=DEFAULT_DESIRED_BUNDLE_SIZE, + columns=None, + label='ReadAllFiles'): +"""Initializes ``ReadAllFromParquet``. + +Args: + min_bundle_size: the minimum size in bytes, to be considered when + splitting the input into bundles. + desired_bundle_size: the desired size in bytes, to be considered when + splitting the input into bundles. + columns: list of columns that will be read from files. A column name + may be a prefix of a nested field, e.g. 'a' will select + 'a.b', 'a.c', and 'a.d.e' +""" +super(ReadAllFromParquet, self).__init__() +source_from_file = partial( +_create_parquet_source, +min_bundle_size=min_bundle_size, +columns=columns +) +self._read_all_files = filebasedsource.ReadAllFiles( +True, CompressionTypes.AUTO, desired_bundle_size, min_bundle_size, +source_from_file) + +self.label = label + + def expand(self, pvalue): +return pvalue | self.label >> self._read_all_files + + +def _create_parquet_source(file_pattern=None, + min_bundle_size=None, +
[jira] [Work logged] (BEAM-4444) Parquet IO for Python SDK
[ https://issues.apache.org/jira/browse/BEAM-?focusedWorklogId=162688=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162688 ] ASF GitHub Bot logged work on BEAM-: Author: ASF GitHub Bot Created on: 05/Nov/18 18:51 Start Date: 05/Nov/18 18:51 Worklog Time Spent: 10m Work Description: chamikaramj commented on a change in pull request #6763: [BEAM-] Parquet IO for Python SDK URL: https://github.com/apache/beam/pull/6763#discussion_r230867152 ## File path: sdks/python/apache_beam/io/parquetio_test.py ## @@ -0,0 +1,376 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from __future__ import absolute_import + +import json +import logging +import os +import sys +import tempfile +import unittest + +import hamcrest as hc +import pyarrow as pa +import pyarrow.parquet as pq + +from apache_beam import Create +from apache_beam import Map +from apache_beam.io import filebasedsource +from apache_beam.io import source_test_utils +from apache_beam.io.iobase import RangeTracker +from apache_beam.io.parquetio import ReadAllFromParquet +from apache_beam.io.parquetio import ReadFromParquet +from apache_beam.io.parquetio import WriteToParquet +from apache_beam.io.parquetio import _create_parquet_sink +from apache_beam.io.parquetio import _create_parquet_source +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to +from apache_beam.transforms.display import DisplayData +from apache_beam.transforms.display_test import DisplayDataItemMatcher + + +class TestParquet(unittest.TestCase): + _temp_files = [] + + @classmethod + def setUpClass(cls): +# Method has been renamed in Python 3 +if sys.version_info[0] < 3: + cls.assertCountEqual = cls.assertItemsEqual + + def setUp(self): +# Reducing the size of thread pools. Without this test execution may fail in +# environments with limited amount of resources. +filebasedsource.MAX_NUM_THREADS_FOR_SIZE_ESTIMATION = 2 + + def tearDown(self): +for path in self._temp_files: + if os.path.exists(path): +os.remove(path) +parent = os.path.dirname(path) +if not os.listdir(parent): + os.rmdir(parent) +self._temp_files = [] + + RECORDS = [{'name': 'Thomas', + 'favorite_number': 1, + 'favorite_color': 'blue'}, {'name': 'Henry', + 'favorite_number': 3, + 'favorite_color': 'green'}, + {'name': 'Toby', + 'favorite_number': 7, + 'favorite_color': 'brown'}, {'name': 'Gordon', + 'favorite_number': 4, + 'favorite_color': 'blue'}, + {'name': 'Emily', + 'favorite_number': -1, + 'favorite_color': 'Red'}, {'name': 'Percy', + 'favorite_number': 6, + 'favorite_color': 'Green'}] + + SCHEMA = pa.schema([ + ('name', pa.binary()), + ('favorite_number', pa.int64()), + ('favorite_color', pa.binary()) + ]) + + def _record_to_columns(self, records, schema): +col_list = [] +for n in schema.names: + column = [] + for r in records: +column.append(r[n]) + col_list.append(column) +return col_list + + def _write_data(self, + directory=None, + prefix=tempfile.template, + row_group_size=1000, + codec='none', + count=len(RECORDS)): + +with tempfile.NamedTemporaryFile( +delete=False, dir=directory, prefix=prefix) as f: + len_records = len(self.RECORDS) + data = [] + for i in range(count): +data.append(self.RECORDS[i % len_records]) + col_data = self._record_to_columns(data, self.SCHEMA) + col_array = [pa.array(c) for c in col_data] + table = pa.Table.from_arrays(col_array, self.SCHEMA.names) + pq.write_table(table, f,
[jira] [Work logged] (BEAM-4444) Parquet IO for Python SDK
[ https://issues.apache.org/jira/browse/BEAM-?focusedWorklogId=162695=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162695 ] ASF GitHub Bot logged work on BEAM-: Author: ASF GitHub Bot Created on: 05/Nov/18 18:51 Start Date: 05/Nov/18 18:51 Worklog Time Spent: 10m Work Description: chamikaramj commented on a change in pull request #6763: [BEAM-] Parquet IO for Python SDK URL: https://github.com/apache/beam/pull/6763#discussion_r230857503 ## File path: sdks/python/apache_beam/io/parquetio.py ## @@ -0,0 +1,349 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""``PTransforms`` for reading from and writing to Parquet files. + +Provides two read ``PTransform``s, ``ReadFromParquet`` and +``ReadAllFromParquet``, that produces a ``PCollection`` of records. +Each record of this ``PCollection`` will contain a single record read from +a Parquet file. Records that are of simple types will be mapped into +corresponding Python types. The actual parquet file operations are done by +pyarrow. Source splitting is supported at row group granularity. + +Additionally, this module provides a write ``PTransform`` ``WriteToParquet`` +that can be used to write a given ``PCollection`` of Python objects to a +Parquet file. +""" +from __future__ import absolute_import + +from functools import partial + +import pyarrow as pa +from pyarrow.parquet import ParquetFile +from pyarrow.parquet import ParquetWriter + +from apache_beam.io import filebasedsink +from apache_beam.io import filebasedsource +from apache_beam.io.filesystem import CompressionTypes +from apache_beam.io.iobase import RangeTracker +from apache_beam.io.iobase import Read +from apache_beam.io.iobase import Write +from apache_beam.transforms import PTransform + +__all__ = ['ReadFromParquet', 'ReadAllFromParquet', 'WriteToParquet'] + + +class ReadFromParquet(PTransform): + """A :class:`~apache_beam.transforms.ptransform.PTransform` for reading + Parquet files.""" + + def __init__(self, file_pattern=None, min_bundle_size=0, + validate=True, columns=None): +"""Initialize :class:`ReadFromParquet`. +""" +super(ReadFromParquet, self).__init__() +self._source = _create_parquet_source( +file_pattern, +min_bundle_size, +validate=validate, +columns=columns +) + + def expand(self, pvalue): +return pvalue.pipeline | Read(self._source) + + def display_data(self): +return {'source_dd': self._source} + + +class ReadAllFromParquet(PTransform): + """A ``PTransform`` for reading ``PCollection`` of Parquet files. + + Uses source '_ParquetSource' to read a ``PCollection`` of Parquet files or + file patterns and produce a ``PCollection`` of Parquet records. + """ + + DEFAULT_DESIRED_BUNDLE_SIZE = 64 * 1024 * 1024 # 64MB + + def __init__(self, min_bundle_size=0, + desired_bundle_size=DEFAULT_DESIRED_BUNDLE_SIZE, + columns=None, + label='ReadAllFiles'): +"""Initializes ``ReadAllFromParquet``. + +Args: + min_bundle_size: the minimum size in bytes, to be considered when + splitting the input into bundles. + desired_bundle_size: the desired size in bytes, to be considered when + splitting the input into bundles. + columns: list of columns that will be read from files. A column name + may be a prefix of a nested field, e.g. 'a' will select + 'a.b', 'a.c', and 'a.d.e' +""" +super(ReadAllFromParquet, self).__init__() +source_from_file = partial( +_create_parquet_source, +min_bundle_size=min_bundle_size, +columns=columns +) +self._read_all_files = filebasedsource.ReadAllFiles( +True, CompressionTypes.AUTO, desired_bundle_size, min_bundle_size, +source_from_file) + +self.label = label + + def expand(self, pvalue): +return pvalue | self.label >> self._read_all_files + + +def _create_parquet_source(file_pattern=None, + min_bundle_size=None, +
[jira] [Work logged] (BEAM-4444) Parquet IO for Python SDK
[ https://issues.apache.org/jira/browse/BEAM-?focusedWorklogId=162689=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162689 ] ASF GitHub Bot logged work on BEAM-: Author: ASF GitHub Bot Created on: 05/Nov/18 18:51 Start Date: 05/Nov/18 18:51 Worklog Time Spent: 10m Work Description: chamikaramj commented on a change in pull request #6763: [BEAM-] Parquet IO for Python SDK URL: https://github.com/apache/beam/pull/6763#discussion_r230866377 ## File path: sdks/python/apache_beam/io/parquetio_test.py ## @@ -0,0 +1,376 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from __future__ import absolute_import + +import json +import logging +import os +import sys +import tempfile +import unittest + +import hamcrest as hc +import pyarrow as pa +import pyarrow.parquet as pq + +from apache_beam import Create +from apache_beam import Map +from apache_beam.io import filebasedsource +from apache_beam.io import source_test_utils +from apache_beam.io.iobase import RangeTracker +from apache_beam.io.parquetio import ReadAllFromParquet +from apache_beam.io.parquetio import ReadFromParquet +from apache_beam.io.parquetio import WriteToParquet +from apache_beam.io.parquetio import _create_parquet_sink +from apache_beam.io.parquetio import _create_parquet_source +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to +from apache_beam.transforms.display import DisplayData +from apache_beam.transforms.display_test import DisplayDataItemMatcher + + +class TestParquet(unittest.TestCase): + _temp_files = [] + + @classmethod + def setUpClass(cls): +# Method has been renamed in Python 3 +if sys.version_info[0] < 3: + cls.assertCountEqual = cls.assertItemsEqual + + def setUp(self): +# Reducing the size of thread pools. Without this test execution may fail in +# environments with limited amount of resources. +filebasedsource.MAX_NUM_THREADS_FOR_SIZE_ESTIMATION = 2 + + def tearDown(self): +for path in self._temp_files: + if os.path.exists(path): +os.remove(path) +parent = os.path.dirname(path) +if not os.listdir(parent): + os.rmdir(parent) +self._temp_files = [] + + RECORDS = [{'name': 'Thomas', + 'favorite_number': 1, + 'favorite_color': 'blue'}, {'name': 'Henry', + 'favorite_number': 3, + 'favorite_color': 'green'}, + {'name': 'Toby', + 'favorite_number': 7, + 'favorite_color': 'brown'}, {'name': 'Gordon', + 'favorite_number': 4, + 'favorite_color': 'blue'}, + {'name': 'Emily', + 'favorite_number': -1, + 'favorite_color': 'Red'}, {'name': 'Percy', + 'favorite_number': 6, + 'favorite_color': 'Green'}] + + SCHEMA = pa.schema([ + ('name', pa.binary()), + ('favorite_number', pa.int64()), + ('favorite_color', pa.binary()) + ]) + + def _record_to_columns(self, records, schema): +col_list = [] +for n in schema.names: + column = [] + for r in records: +column.append(r[n]) + col_list.append(column) +return col_list + + def _write_data(self, + directory=None, + prefix=tempfile.template, + row_group_size=1000, + codec='none', + count=len(RECORDS)): + +with tempfile.NamedTemporaryFile( +delete=False, dir=directory, prefix=prefix) as f: + len_records = len(self.RECORDS) + data = [] + for i in range(count): +data.append(self.RECORDS[i % len_records]) + col_data = self._record_to_columns(data, self.SCHEMA) + col_array = [pa.array(c) for c in col_data] + table = pa.Table.from_arrays(col_array, self.SCHEMA.names) + pq.write_table(table, f,
[jira] [Work logged] (BEAM-4444) Parquet IO for Python SDK
[ https://issues.apache.org/jira/browse/BEAM-?focusedWorklogId=162686=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162686 ] ASF GitHub Bot logged work on BEAM-: Author: ASF GitHub Bot Created on: 05/Nov/18 18:51 Start Date: 05/Nov/18 18:51 Worklog Time Spent: 10m Work Description: chamikaramj commented on a change in pull request #6763: [BEAM-] Parquet IO for Python SDK URL: https://github.com/apache/beam/pull/6763#discussion_r230864626 ## File path: sdks/python/apache_beam/io/parquetio_test.py ## @@ -0,0 +1,376 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from __future__ import absolute_import + +import json +import logging +import os +import sys +import tempfile +import unittest + +import hamcrest as hc +import pyarrow as pa +import pyarrow.parquet as pq + +from apache_beam import Create +from apache_beam import Map +from apache_beam.io import filebasedsource +from apache_beam.io import source_test_utils +from apache_beam.io.iobase import RangeTracker +from apache_beam.io.parquetio import ReadAllFromParquet +from apache_beam.io.parquetio import ReadFromParquet +from apache_beam.io.parquetio import WriteToParquet +from apache_beam.io.parquetio import _create_parquet_sink +from apache_beam.io.parquetio import _create_parquet_source +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to +from apache_beam.transforms.display import DisplayData +from apache_beam.transforms.display_test import DisplayDataItemMatcher + + +class TestParquet(unittest.TestCase): + _temp_files = [] + + @classmethod + def setUpClass(cls): +# Method has been renamed in Python 3 +if sys.version_info[0] < 3: + cls.assertCountEqual = cls.assertItemsEqual + + def setUp(self): +# Reducing the size of thread pools. Without this test execution may fail in +# environments with limited amount of resources. +filebasedsource.MAX_NUM_THREADS_FOR_SIZE_ESTIMATION = 2 + + def tearDown(self): +for path in self._temp_files: + if os.path.exists(path): +os.remove(path) +parent = os.path.dirname(path) +if not os.listdir(parent): + os.rmdir(parent) +self._temp_files = [] + + RECORDS = [{'name': 'Thomas', + 'favorite_number': 1, + 'favorite_color': 'blue'}, {'name': 'Henry', + 'favorite_number': 3, + 'favorite_color': 'green'}, Review comment: Ah, good to see Thomas and Friends "tradition" continuing here :) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 162686) > Parquet IO for Python SDK > - > > Key: BEAM- > URL: https://issues.apache.org/jira/browse/BEAM- > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: Bruce Arctor >Assignee: Heejong Lee >Priority: Major > Time Spent: 4.5h > Remaining Estimate: 0h > > Add Parquet Support for the Python SDK. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4444) Parquet IO for Python SDK
[ https://issues.apache.org/jira/browse/BEAM-?focusedWorklogId=162693=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162693 ] ASF GitHub Bot logged work on BEAM-: Author: ASF GitHub Bot Created on: 05/Nov/18 18:51 Start Date: 05/Nov/18 18:51 Worklog Time Spent: 10m Work Description: chamikaramj commented on a change in pull request #6763: [BEAM-] Parquet IO for Python SDK URL: https://github.com/apache/beam/pull/6763#discussion_r230868171 ## File path: sdks/python/apache_beam/io/parquetio_test.py ## @@ -0,0 +1,376 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from __future__ import absolute_import + +import json +import logging +import os +import sys +import tempfile +import unittest + +import hamcrest as hc +import pyarrow as pa +import pyarrow.parquet as pq + +from apache_beam import Create +from apache_beam import Map +from apache_beam.io import filebasedsource +from apache_beam.io import source_test_utils +from apache_beam.io.iobase import RangeTracker +from apache_beam.io.parquetio import ReadAllFromParquet +from apache_beam.io.parquetio import ReadFromParquet +from apache_beam.io.parquetio import WriteToParquet +from apache_beam.io.parquetio import _create_parquet_sink +from apache_beam.io.parquetio import _create_parquet_source +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to +from apache_beam.transforms.display import DisplayData +from apache_beam.transforms.display_test import DisplayDataItemMatcher + + +class TestParquet(unittest.TestCase): + _temp_files = [] + + @classmethod + def setUpClass(cls): +# Method has been renamed in Python 3 +if sys.version_info[0] < 3: + cls.assertCountEqual = cls.assertItemsEqual + + def setUp(self): +# Reducing the size of thread pools. Without this test execution may fail in +# environments with limited amount of resources. +filebasedsource.MAX_NUM_THREADS_FOR_SIZE_ESTIMATION = 2 + + def tearDown(self): +for path in self._temp_files: + if os.path.exists(path): +os.remove(path) +parent = os.path.dirname(path) +if not os.listdir(parent): + os.rmdir(parent) +self._temp_files = [] + + RECORDS = [{'name': 'Thomas', + 'favorite_number': 1, + 'favorite_color': 'blue'}, {'name': 'Henry', + 'favorite_number': 3, + 'favorite_color': 'green'}, + {'name': 'Toby', + 'favorite_number': 7, + 'favorite_color': 'brown'}, {'name': 'Gordon', + 'favorite_number': 4, + 'favorite_color': 'blue'}, + {'name': 'Emily', + 'favorite_number': -1, + 'favorite_color': 'Red'}, {'name': 'Percy', + 'favorite_number': 6, + 'favorite_color': 'Green'}] + + SCHEMA = pa.schema([ + ('name', pa.binary()), + ('favorite_number', pa.int64()), + ('favorite_color', pa.binary()) + ]) + + def _record_to_columns(self, records, schema): +col_list = [] +for n in schema.names: + column = [] + for r in records: +column.append(r[n]) + col_list.append(column) +return col_list + + def _write_data(self, + directory=None, + prefix=tempfile.template, + row_group_size=1000, + codec='none', + count=len(RECORDS)): + +with tempfile.NamedTemporaryFile( +delete=False, dir=directory, prefix=prefix) as f: + len_records = len(self.RECORDS) + data = [] + for i in range(count): +data.append(self.RECORDS[i % len_records]) + col_data = self._record_to_columns(data, self.SCHEMA) + col_array = [pa.array(c) for c in col_data] + table = pa.Table.from_arrays(col_array, self.SCHEMA.names) + pq.write_table(table, f,
[jira] [Resolved] (BEAM-5932) Python SDK deprecation warnings are broken
[ https://issues.apache.org/jira/browse/BEAM-5932?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Udi Meiri resolved BEAM-5932. - Resolution: Fixed Fix Version/s: Not applicable PR merged > Python SDK deprecation warnings are broken > -- > > Key: BEAM-5932 > URL: https://issues.apache.org/jira/browse/BEAM-5932 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Udi Meiri >Assignee: Udi Meiri >Priority: Blocker > Fix For: Not applicable > > Time Spent: 1h 20m > Remaining Estimate: 0h > > [https://github.com/apache/beam/pull/6687] > removed the line > {code:python} > warnings.simplefilter("once"){code} > from annotations.py. > This disables all deprecation warnings. > Please fix. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (BEAM-5966) When working "LiteralGqlQuery", Error: Query cannot have any sort orders.
[ https://issues.apache.org/jira/browse/BEAM-5966?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Murat ODUNC updated BEAM-5966: -- Description: Hello.. I have used Apache Beam pipelines on my project. And I have a problem to read data from GCP DataStore. My First question is "How to create "Query" object" . I'am sharing a sample of regarding JavaDoc. [https://beam.apache.org/releases/javadoc/2.8.0/] {code:java} Query query = ...; // I dont know any idea how configure the object String projectId = "..."; Pipeline p = Pipeline.create(options); PCollection entities = p.apply( DatastoreIO.v1().read() .withProjectId(projectId) .withQuery(query)); {code} My second question is how to set "sort oder" of the query? I tried to 'LiteralGqlQuery' to read data but DataStore IO Reader failed by follow error trace.. {noformat} java.lang.IllegalArgumentException: Query cannot have any sort orders. at com.google.datastore.v1.client.QuerySplitterImpl.validateQuery(QuerySplitterImpl.java:128) ~[datastore-v1-proto-client-1.6.0.jar:na] at com.google.datastore.v1.client.QuerySplitterImpl.getSplits(QuerySplitterImpl.java:69) ~[datastore-v1-proto-client-1.6.0.jar:na] at org.apache.beam.sdk.io.gcp.datastore.DatastoreV1$Read.splitQuery(DatastoreV1.java:454) ~[beam-sdks-java-io-google-cloud-platform-2.8.0.jar:na] at org.apache.beam.sdk.io.gcp.datastore.DatastoreV1$Read.access$100(DatastoreV1.java:264) ~[beam-sdks-java-io-google-cloud-platform-2.8.0.jar:na] at org.apache.beam.sdk.io.gcp.datastore.DatastoreV1$Read$SplitQueryFn.processElement(DatastoreV1.java:813) ~[beam-sdks-java-io-google-cloud-platform-2.8.0.jar:na] at org.apache.beam.sdk.io.gcp.datastore.DatastoreV1$Read$SplitQueryFn$DoFnInvoker.invokeProcessElement(Unknown Source) [na:na] at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275) [beam-runners-direct-java-2.8.0.jar:na] at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240) [beam-runners-direct-java-2.8.0.jar:na] at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:78) [beam-runners-direct-java-2.8.0.jar:na] at org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:207) [beam-runners-direct-java-2.8.0.jar:na] at org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:55) [beam-runners-direct-java-2.8.0.jar:na] at org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:160) [beam-runners-direct-java-2.8.0.jar:na] at org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:124) [beam-runners-direct-java-2.8.0.jar:na] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_162] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_162] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_162] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_162] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_162] {noformat} My code is here {code:java} PipelineOptions options = PipelineOptionsFactory.create(); options.setRunner(DirectRunner.class); String gqlQuery = String.format("SELECT * FROM task " + "WHERE createdAt > DATETIME('%s') "+ "AND createdAt < DATETIME('%s') ORDER BY createdAt ASC", "2018-11-03T00:00:00Z", "2018-11-03T23:59:59Z" ); log.info("GQL: " + gqlQuery); String projectId = "foo-bar"; String ns = "prod-01ce205a-22ff-4bab-b133-926bdc54c8b3"; Pipeline p = Pipeline.create(options); PCollection entities = p.apply( DatastoreIO.v1().read() .withProjectId(projectId).withLiteralGqlQuery(gqlQuery) .withQuery(query) .withNamespace(ns)) .apply("Debug", ParDo.of(new DoFn(){ @ProcessElement public void processElement(ProcessContext c) { log.info(c.element().getProperties().get("foo").getStringValue()); } })); p.run().waitUntilFinish(); {code} ``` was: Hello.. I have used Apache Beam pipelines on my project. And I have a problem to read data from GCP DataStore. My First question is "How to create "Query" object" . I'am sharing a sample of regarding JavaDoc. [https://beam.apache.org/releases/javadoc/2.8.0/] {code:java} Query query = ...; // I dont know any idea how configure the object String projectId = "..."; Pipeline p = Pipeline.create(options); PCollection entities = p.apply( DatastoreIO.v1().read() .withProjectId(projectId) .withQuery(query)); {code} My second question is how to set "sort
[jira] [Work logged] (BEAM-4124) Support elements larger than 4 MB
[ https://issues.apache.org/jira/browse/BEAM-4124?focusedWorklogId=162671=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162671 ] ASF GitHub Bot logged work on BEAM-4124: Author: ASF GitHub Bot Created on: 05/Nov/18 18:32 Start Date: 05/Nov/18 18:32 Worklog Time Spent: 10m Work Description: lostluck commented on issue #6948: [BEAM-4124] datamgr.go:Remove chunksize constraint URL: https://github.com/apache/beam/pull/6948#issuecomment-435983728 R: @aaltay CC: @wcn3 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 162671) Time Spent: 20m (was: 10m) > Support elements larger than 4 MB > - > > Key: BEAM-4124 > URL: https://issues.apache.org/jira/browse/BEAM-4124 > Project: Beam > Issue Type: Bug > Components: sdk-go >Reporter: Cody Schroeder >Assignee: Robert Burke >Priority: Major > Time Spent: 20m > Remaining Estimate: 0h > > The Go SDK harness is limited by a gRPC message size limit of 4 MB. > https://github.com/apache/beam/blob/4a32353/sdks/go/pkg/beam/core/runtime/harness/datamgr.go#L31 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4124) Support elements larger than 4 MB
[ https://issues.apache.org/jira/browse/BEAM-4124?focusedWorklogId=162669=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162669 ] ASF GitHub Bot logged work on BEAM-4124: Author: ASF GitHub Bot Created on: 05/Nov/18 18:31 Start Date: 05/Nov/18 18:31 Worklog Time Spent: 10m Work Description: lostluck opened a new pull request #6948: [BEAM-4124] datamgr.go:Remove chunksize constraint URL: https://github.com/apache/beam/pull/6948 Go SDK harness was limited to elements of at most ~4MB with this check, but provided no means to increase it. The chunking is valuable for batching tiny elements over RPCs to improve throughput but is less meaningful for larger elements. The Go SDK shouldn't be dictating maximum element size, and let that be configurable by the FnAPI transport (eg. GRPC) to the runner harness instead. This should have an implicit ~2-4GB cap on element size. **Please** add a meaningful description for your change here Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). It will help us expedite review of your Pull Request if you tag someone (e.g. `@username`) to look at it. Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/) | --- | --- | --- | --- | --- | --- Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/) [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/) [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/) | --- | --- | --- This is an automated message from the Apache Git Service. To
[jira] [Work logged] (BEAM-5817) Nexmark test of joining stream to files
[ https://issues.apache.org/jira/browse/BEAM-5817?focusedWorklogId=162662=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162662 ] ASF GitHub Bot logged work on BEAM-5817: Author: ASF GitHub Bot Created on: 05/Nov/18 18:28 Start Date: 05/Nov/18 18:28 Worklog Time Spent: 10m Work Description: kennknowles commented on issue #6934: [BEAM-5817] Unify Nexmark SQL and non-SQL paths URL: https://github.com/apache/beam/pull/6934#issuecomment-435982238 Rebased now that underlying PR is merged. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 162662) Time Spent: 5h (was: 4h 50m) > Nexmark test of joining stream to files > --- > > Key: BEAM-5817 > URL: https://issues.apache.org/jira/browse/BEAM-5817 > Project: Beam > Issue Type: New Feature > Components: examples-nexmark >Reporter: Kenneth Knowles >Assignee: Kenneth Knowles >Priority: Major > Time Spent: 5h > Remaining Estimate: 0h > > Nexmark is a convenient framework for testing the use case of large scale > stream enrichment. One way is joining a stream to files, and it can be tested > via any source that Nexmark supports. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5961) No precommit coverage for Nexmark postcommit main entry point
[ https://issues.apache.org/jira/browse/BEAM-5961?focusedWorklogId=162657=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162657 ] ASF GitHub Bot logged work on BEAM-5961: Author: ASF GitHub Bot Created on: 05/Nov/18 18:26 Start Date: 05/Nov/18 18:26 Worklog Time Spent: 10m Work Description: kennknowles closed pull request #6933: [BEAM-5961] Nexmark rollforward new query with test of Nexmark main URL: https://github.com/apache/beam/pull/6933 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java index 3ca6a3562ac..fcccf8d2431 100644 --- a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java +++ b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java @@ -97,8 +97,7 @@ private Result(NexmarkConfiguration configuration, NexmarkPerf perf) { private final NexmarkLauncher nexmarkLauncher; private final NexmarkConfiguration configuration; -private Run(String[] args, NexmarkConfiguration configuration) { - NexmarkOptions options = PipelineOptionsFactory.fromArgs(args).as(NexmarkOptions.class); +private Run(NexmarkOptions options, NexmarkConfiguration configuration) { this.nexmarkLauncher = new NexmarkLauncher<>(options); this.configuration = configuration; } @@ -112,9 +111,13 @@ public Result call() throws IOException { /** Entry point. */ void runAll(String[] args) throws IOException { -Instant start = Instant.now(); NexmarkOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(NexmarkOptions.class); +runAll(options); + } + + void runAll(NexmarkOptions options) throws IOException { +Instant start = Instant.now(); Map baseline = loadBaseline(options.getBaselineFilename()); Map actual = new LinkedHashMap<>(); Set configurations = options.getSuite().getConfigurations(options); @@ -126,7 +129,7 @@ void runAll(String[] args) throws IOException { try { // Schedule all the configurations. for (NexmarkConfiguration configuration : configurations) { -completion.submit(new Run(args, configuration)); +completion.submit(new Run(options, configuration)); } // Collect all the results. diff --git a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkConfiguration.java b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkConfiguration.java index 43b5a16c743..bcb097e53b6 100644 --- a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkConfiguration.java +++ b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkConfiguration.java @@ -56,6 +56,21 @@ */ @JsonProperty public NexmarkUtils.PubSubMode pubSubMode = NexmarkUtils.PubSubMode.COMBINED; + /** The type of side input to use. */ + @JsonProperty public NexmarkUtils.SideInputType sideInputType = NexmarkUtils.SideInputType.DIRECT; + + /** Specify the number of rows to write to the side input. */ + @JsonProperty public int sideInputRowCount = 500; + + /** Specify the number of shards to write to the side input. */ + @JsonProperty public int sideInputNumShards = 3; + + /** + * Specify a prefix URL for side input files, which will be created for use queries that join the + * stream to static enrichment data. + */ + @JsonProperty public String sideInputUrl = null; + /** * Number of events to generate. If zero, generate as many as possible without overflowing * internal counters etc. diff --git a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java index 7d29834f806..7a60e4d235d 100644 --- a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java +++ b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java @@ -50,6 +50,8 @@ import org.apache.beam.sdk.nexmark.model.Event; import org.apache.beam.sdk.nexmark.model.KnownSize; import org.apache.beam.sdk.nexmark.model.Person; +import org.apache.beam.sdk.nexmark.queries.BoundedSideInputJoin; +import org.apache.beam.sdk.nexmark.queries.BoundedSideInputJoinModel; import org.apache.beam.sdk.nexmark.queries.NexmarkQuery; import org.apache.beam.sdk.nexmark.queries.NexmarkQueryModel; import org.apache.beam.sdk.nexmark.queries.Query0; @@ -1110,6 +1112,10 @@ public NexmarkPerf
[jira] [Work logged] (BEAM-5918) Add Cast transform for Rows
[ https://issues.apache.org/jira/browse/BEAM-5918?focusedWorklogId=162655=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162655 ] ASF GitHub Bot logged work on BEAM-5918: Author: ASF GitHub Bot Created on: 05/Nov/18 18:24 Start Date: 05/Nov/18 18:24 Worklog Time Spent: 10m Work Description: kennknowles commented on issue #6888: [BEAM-5918] Add Cast transform for Rows URL: https://github.com/apache/beam/pull/6888#issuecomment-435980683 I will wait a short while if you want to make any last changes like touching up the javadoc, then I will go ahead and merge and we can do it in follow-up smaller PRs. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 162655) Time Spent: 4.5h (was: 4h 20m) > Add Cast transform for Rows > --- > > Key: BEAM-5918 > URL: https://issues.apache.org/jira/browse/BEAM-5918 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core >Reporter: Gleb Kanterov >Assignee: Gleb Kanterov >Priority: Major > Time Spent: 4.5h > Remaining Estimate: 0h > > There is a need for a generic transform that given two Row schemas will > convert rows between them. There must be a possibility to opt-out from > certain kind of conversions, for instance, converting ints to shorts can > cause overflow. Another example, a schema could have a nullable field, but > never have NULL value in practice, because it was filtered out. > What is needed: > - widening values (e.g., int -> long) > - narrowwing (e.g., int -> short) > - runtime check for overflow while narrowing > - ignoring nullability (nullable=true -> nullable=false) > - weakening nullability (nullable=false -> nullable=true) > - projection (Schema(a: Int32, b: Int32) -> Schema(a: Int32)) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5918) Add Cast transform for Rows
[ https://issues.apache.org/jira/browse/BEAM-5918?focusedWorklogId=162653=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162653 ] ASF GitHub Bot logged work on BEAM-5918: Author: ASF GitHub Bot Created on: 05/Nov/18 18:23 Start Date: 05/Nov/18 18:23 Worklog Time Spent: 10m Work Description: kennknowles commented on a change in pull request #6888: [BEAM-5918] Add Cast transform for Rows URL: https://github.com/apache/beam/pull/6888#discussion_r230854619 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java ## @@ -338,6 +338,47 @@ public boolean isMapType() { public boolean isCompositeType() { return COMPOSITE_TYPES.contains(this); } + +public boolean isSubtypeOf(TypeName other) { + return other.isSupertypeOf(this); +} + +public boolean isSupertypeOf(TypeName other) { Review comment: In #6861 nullability is added for array elements and map values. It isn't expressed in the most natural "type system" way, but we should move towards treating a nullable `T` as `OPTIONAL` with automatic coercion to `T` rather than treating it as just a `T` with nullability as a side condition. It requires a fairly significant refactor to do so. Just something to keep in mind. It would affect this sub/supertype check. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 162653) Time Spent: 4h 20m (was: 4h 10m) > Add Cast transform for Rows > --- > > Key: BEAM-5918 > URL: https://issues.apache.org/jira/browse/BEAM-5918 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core >Reporter: Gleb Kanterov >Assignee: Gleb Kanterov >Priority: Major > Time Spent: 4h 20m > Remaining Estimate: 0h > > There is a need for a generic transform that given two Row schemas will > convert rows between them. There must be a possibility to opt-out from > certain kind of conversions, for instance, converting ints to shorts can > cause overflow. Another example, a schema could have a nullable field, but > never have NULL value in practice, because it was filtered out. > What is needed: > - widening values (e.g., int -> long) > - narrowwing (e.g., int -> short) > - runtime check for overflow while narrowing > - ignoring nullability (nullable=true -> nullable=false) > - weakening nullability (nullable=false -> nullable=true) > - projection (Schema(a: Int32, b: Int32) -> Schema(a: Int32)) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5918) Add Cast transform for Rows
[ https://issues.apache.org/jira/browse/BEAM-5918?focusedWorklogId=162651=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162651 ] ASF GitHub Bot logged work on BEAM-5918: Author: ASF GitHub Bot Created on: 05/Nov/18 18:23 Start Date: 05/Nov/18 18:23 Worklog Time Spent: 10m Work Description: kennknowles commented on a change in pull request #6888: [BEAM-5918] Add Cast transform for Rows URL: https://github.com/apache/beam/pull/6888#discussion_r230858306 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/SchemaZipFold.java ## @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.schemas.utils; + +import com.google.auto.value.AutoValue; +import com.google.common.collect.ImmutableList; +import java.io.Serializable; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.stream.Stream; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.Field; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.schemas.Schema.TypeName; + +/** + * Visitor that zips schemas, and accepts pairs of fields and their types. + * + * Values returned by `accept` are accumulated. + */ +public abstract class SchemaZipFold implements Serializable { Review comment: I understand this class, but maybe others won't love it vs nested switch statements / static recursive functions. Out of curiosity does it not add some allocation cost? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 162651) Time Spent: 4h 10m (was: 4h) > Add Cast transform for Rows > --- > > Key: BEAM-5918 > URL: https://issues.apache.org/jira/browse/BEAM-5918 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core >Reporter: Gleb Kanterov >Assignee: Gleb Kanterov >Priority: Major > Time Spent: 4h 10m > Remaining Estimate: 0h > > There is a need for a generic transform that given two Row schemas will > convert rows between them. There must be a possibility to opt-out from > certain kind of conversions, for instance, converting ints to shorts can > cause overflow. Another example, a schema could have a nullable field, but > never have NULL value in practice, because it was filtered out. > What is needed: > - widening values (e.g., int -> long) > - narrowwing (e.g., int -> short) > - runtime check for overflow while narrowing > - ignoring nullability (nullable=true -> nullable=false) > - weakening nullability (nullable=false -> nullable=true) > - projection (Schema(a: Int32, b: Int32) -> Schema(a: Int32)) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5918) Add Cast transform for Rows
[ https://issues.apache.org/jira/browse/BEAM-5918?focusedWorklogId=162650=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162650 ] ASF GitHub Bot logged work on BEAM-5918: Author: ASF GitHub Bot Created on: 05/Nov/18 18:23 Start Date: 05/Nov/18 18:23 Worklog Time Spent: 10m Work Description: kennknowles commented on a change in pull request #6888: [BEAM-5918] Add Cast transform for Rows URL: https://github.com/apache/beam/pull/6888#discussion_r230857351 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Cast.java ## @@ -0,0 +1,440 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.schemas.transforms; + +import com.google.auto.value.AutoValue; +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Maps; +import java.io.Serializable; +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.schemas.FieldAccessDescriptor; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.Field; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.schemas.Schema.TypeName; +import org.apache.beam.sdk.schemas.utils.SchemaZipFold; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; + +/** Set of utilities for casting rows between schemas. */ +@Experimental(Experimental.Kind.SCHEMAS) +@AutoValue +public abstract class Cast extends PTransform, PCollection> { + + public abstract Schema outputSchema(); + + public abstract Validator validator(); + + public static Cast of(Schema outputSchema, Validator validator) { +return new AutoValue_Cast<>(outputSchema, validator); + } + + public static Cast widening(Schema outputSchema) { +return new AutoValue_Cast<>(outputSchema, Widening.of()); + } + + public static Cast narrowing(Schema outputSchema) { +return new AutoValue_Cast<>(outputSchema, Narrowing.of()); + } + + /** Describes compatibility errors during casting. */ + @AutoValue + public abstract static class CompatibilityError implements Serializable { + +public abstract List path(); + +public abstract String message(); + +public static CompatibilityError create(List path, String message) { + return new AutoValue_Cast_CompatibilityError(path, message); +} + } + + /** Interface for statically validating casts. */ + public interface Validator extends Serializable { +List apply(Schema input, Schema output); + } + + /** + * Widening changes to type that can represent any possible value of the original type. + * + * Standard widening conversions: + * + * + * BYTE to INT16, INT32, INT64, FLOAT, DOUBLE, DECIMAL + * INT16 to INT32, INT64, FLOAT, DOUBLE, DECIMAL + * INT32 to INT64, FLOAT, DOUBLE, DECIMAL + * INT64 to FLOAT, DOUBLE, DECIMAL + * FLOAT to DOUBLE, DECIMAL + * DOUBLE to DECIMAL + * + * + * Row widening: + * + * + * wider schema to schema with a subset of fields + * non-nullable fields to nullable fields + * + * + * Widening doesn't lose information about the overall magnitude in following cases: + * + * + * integral type to another integral type + * BYTE or INT16 to FLOAT, DOUBLE or DECIMAL + * INT32 to DOUBLE Review comment: What about INT64? And why only some can go to DECIMAL? And adding nullability is widening too. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact
[jira] [Work logged] (BEAM-5918) Add Cast transform for Rows
[ https://issues.apache.org/jira/browse/BEAM-5918?focusedWorklogId=162649=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162649 ] ASF GitHub Bot logged work on BEAM-5918: Author: ASF GitHub Bot Created on: 05/Nov/18 18:23 Start Date: 05/Nov/18 18:23 Worklog Time Spent: 10m Work Description: kennknowles commented on a change in pull request #6888: [BEAM-5918] Add Cast transform for Rows URL: https://github.com/apache/beam/pull/6888#discussion_r230857163 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Cast.java ## @@ -0,0 +1,440 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.schemas.transforms; + +import com.google.auto.value.AutoValue; +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Maps; +import java.io.Serializable; +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.schemas.FieldAccessDescriptor; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.Field; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.schemas.Schema.TypeName; +import org.apache.beam.sdk.schemas.utils.SchemaZipFold; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; + +/** Set of utilities for casting rows between schemas. */ +@Experimental(Experimental.Kind.SCHEMAS) +@AutoValue +public abstract class Cast extends PTransform, PCollection> { + + public abstract Schema outputSchema(); + + public abstract Validator validator(); + + public static Cast of(Schema outputSchema, Validator validator) { +return new AutoValue_Cast<>(outputSchema, validator); + } + + public static Cast widening(Schema outputSchema) { +return new AutoValue_Cast<>(outputSchema, Widening.of()); + } + + public static Cast narrowing(Schema outputSchema) { +return new AutoValue_Cast<>(outputSchema, Narrowing.of()); + } + + /** Describes compatibility errors during casting. */ + @AutoValue + public abstract static class CompatibilityError implements Serializable { + +public abstract List path(); + +public abstract String message(); + +public static CompatibilityError create(List path, String message) { + return new AutoValue_Cast_CompatibilityError(path, message); +} + } + + /** Interface for statically validating casts. */ + public interface Validator extends Serializable { +List apply(Schema input, Schema output); + } + + /** + * Widening changes to type that can represent any possible value of the original type. + * + * Standard widening conversions: + * + * + * BYTE to INT16, INT32, INT64, FLOAT, DOUBLE, DECIMAL + * INT16 to INT32, INT64, FLOAT, DOUBLE, DECIMAL + * INT32 to INT64, FLOAT, DOUBLE, DECIMAL + * INT64 to FLOAT, DOUBLE, DECIMAL + * FLOAT to DOUBLE, DECIMAL + * DOUBLE to DECIMAL + * + * + * Row widening: + * + * + * wider schema to schema with a subset of fields + * non-nullable fields to nullable fields + * + * + * Widening doesn't lose information about the overall magnitude in following cases: + * + * + * integral type to another integral type + * BYTE or INT16 to FLOAT, DOUBLE or DECIMAL + * INT32 to DOUBLE + * + * + * Other conversions to may cause loss of precision. + */ + public static class Widening implements Validator { +private final Fold fold = new Fold(); + +public static Widening of() { + return new Widening(); +} + +@Override +public String toString() { + return "Cast.Widening"; +} + +@Override +public List apply(final Schema input,
[jira] [Work logged] (BEAM-5918) Add Cast transform for Rows
[ https://issues.apache.org/jira/browse/BEAM-5918?focusedWorklogId=162654=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162654 ] ASF GitHub Bot logged work on BEAM-5918: Author: ASF GitHub Bot Created on: 05/Nov/18 18:23 Start Date: 05/Nov/18 18:23 Worklog Time Spent: 10m Work Description: kennknowles commented on a change in pull request #6888: [BEAM-5918] Add Cast transform for Rows URL: https://github.com/apache/beam/pull/6888#discussion_r230857029 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Cast.java ## @@ -0,0 +1,440 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.schemas.transforms; + +import com.google.auto.value.AutoValue; +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Maps; +import java.io.Serializable; +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.schemas.FieldAccessDescriptor; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.Field; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.schemas.Schema.TypeName; +import org.apache.beam.sdk.schemas.utils.SchemaZipFold; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; + +/** Set of utilities for casting rows between schemas. */ +@Experimental(Experimental.Kind.SCHEMAS) +@AutoValue +public abstract class Cast extends PTransform, PCollection> { + + public abstract Schema outputSchema(); + + public abstract Validator validator(); + + public static Cast of(Schema outputSchema, Validator validator) { +return new AutoValue_Cast<>(outputSchema, validator); + } + + public static Cast widening(Schema outputSchema) { +return new AutoValue_Cast<>(outputSchema, Widening.of()); + } + + public static Cast narrowing(Schema outputSchema) { +return new AutoValue_Cast<>(outputSchema, Narrowing.of()); + } + + /** Describes compatibility errors during casting. */ + @AutoValue + public abstract static class CompatibilityError implements Serializable { + +public abstract List path(); + +public abstract String message(); + +public static CompatibilityError create(List path, String message) { + return new AutoValue_Cast_CompatibilityError(path, message); +} + } + + /** Interface for statically validating casts. */ + public interface Validator extends Serializable { +List apply(Schema input, Schema output); + } + + /** + * Widening changes to type that can represent any possible value of the original type. + * + * Standard widening conversions: + * + * + * BYTE to INT16, INT32, INT64, FLOAT, DOUBLE, DECIMAL + * INT16 to INT32, INT64, FLOAT, DOUBLE, DECIMAL + * INT32 to INT64, FLOAT, DOUBLE, DECIMAL + * INT64 to FLOAT, DOUBLE, DECIMAL + * FLOAT to DOUBLE, DECIMAL + * DOUBLE to DECIMAL + * + * + * Row widening: + * + * + * wider schema to schema with a subset of fields + * non-nullable fields to nullable fields + * + * + * Widening doesn't lose information about the overall magnitude in following cases: + * + * + * integral type to another integral type + * BYTE or INT16 to FLOAT, DOUBLE or DECIMAL + * INT32 to DOUBLE + * + * + * Other conversions to may cause loss of precision. + */ + public static class Widening implements Validator { +private final Fold fold = new Fold(); + +public static Widening of() { + return new Widening(); +} + +@Override +public String toString() { + return "Cast.Widening"; +} + +@Override +public List apply(final Schema input,
[jira] [Work logged] (BEAM-5918) Add Cast transform for Rows
[ https://issues.apache.org/jira/browse/BEAM-5918?focusedWorklogId=162652=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162652 ] ASF GitHub Bot logged work on BEAM-5918: Author: ASF GitHub Bot Created on: 05/Nov/18 18:23 Start Date: 05/Nov/18 18:23 Worklog Time Spent: 10m Work Description: kennknowles commented on a change in pull request #6888: [BEAM-5918] Add Cast transform for Rows URL: https://github.com/apache/beam/pull/6888#discussion_r230857941 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Cast.java ## @@ -0,0 +1,440 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.schemas.transforms; + +import com.google.auto.value.AutoValue; +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Maps; +import java.io.Serializable; +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.schemas.FieldAccessDescriptor; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.Field; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.schemas.Schema.TypeName; +import org.apache.beam.sdk.schemas.utils.SchemaZipFold; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; + +/** Set of utilities for casting rows between schemas. */ +@Experimental(Experimental.Kind.SCHEMAS) +@AutoValue +public abstract class Cast extends PTransform, PCollection> { + + public abstract Schema outputSchema(); + + public abstract Validator validator(); + + public static Cast of(Schema outputSchema, Validator validator) { +return new AutoValue_Cast<>(outputSchema, validator); + } + + public static Cast widening(Schema outputSchema) { +return new AutoValue_Cast<>(outputSchema, Widening.of()); + } + + public static Cast narrowing(Schema outputSchema) { +return new AutoValue_Cast<>(outputSchema, Narrowing.of()); + } + + /** Describes compatibility errors during casting. */ + @AutoValue + public abstract static class CompatibilityError implements Serializable { + +public abstract List path(); + +public abstract String message(); + +public static CompatibilityError create(List path, String message) { + return new AutoValue_Cast_CompatibilityError(path, message); +} + } + + /** Interface for statically validating casts. */ + public interface Validator extends Serializable { +List apply(Schema input, Schema output); + } + + /** + * Widening changes to type that can represent any possible value of the original type. + * + * Standard widening conversions: + * + * + * BYTE to INT16, INT32, INT64, FLOAT, DOUBLE, DECIMAL + * INT16 to INT32, INT64, FLOAT, DOUBLE, DECIMAL + * INT32 to INT64, FLOAT, DOUBLE, DECIMAL + * INT64 to FLOAT, DOUBLE, DECIMAL + * FLOAT to DOUBLE, DECIMAL + * DOUBLE to DECIMAL + * + * + * Row widening: + * + * + * wider schema to schema with a subset of fields + * non-nullable fields to nullable fields + * + * + * Widening doesn't lose information about the overall magnitude in following cases: + * + * + * integral type to another integral type + * BYTE or INT16 to FLOAT, DOUBLE or DECIMAL + * INT32 to DOUBLE + * + * + * Other conversions to may cause loss of precision. + */ + public static class Widening implements Validator { +private final Fold fold = new Fold(); + +public static Widening of() { + return new Widening(); +} + +@Override +public String toString() { + return "Cast.Widening"; +} + +@Override +public List apply(final Schema input,
[jira] [Work logged] (BEAM-5918) Add Cast transform for Rows
[ https://issues.apache.org/jira/browse/BEAM-5918?focusedWorklogId=162638=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162638 ] ASF GitHub Bot logged work on BEAM-5918: Author: ASF GitHub Bot Created on: 05/Nov/18 18:12 Start Date: 05/Nov/18 18:12 Worklog Time Spent: 10m Work Description: kennknowles commented on issue #6888: [BEAM-5918] Add Cast transform for Rows URL: https://github.com/apache/beam/pull/6888#issuecomment-435976579 Sorry for the delay - taking another look. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 162638) Time Spent: 3h 50m (was: 3h 40m) > Add Cast transform for Rows > --- > > Key: BEAM-5918 > URL: https://issues.apache.org/jira/browse/BEAM-5918 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core >Reporter: Gleb Kanterov >Assignee: Gleb Kanterov >Priority: Major > Time Spent: 3h 50m > Remaining Estimate: 0h > > There is a need for a generic transform that given two Row schemas will > convert rows between them. There must be a possibility to opt-out from > certain kind of conversions, for instance, converting ints to shorts can > cause overflow. Another example, a schema could have a nullable field, but > never have NULL value in practice, because it was filtered out. > What is needed: > - widening values (e.g., int -> long) > - narrowwing (e.g., int -> short) > - runtime check for overflow while narrowing > - ignoring nullability (nullable=true -> nullable=false) > - weakening nullability (nullable=false -> nullable=true) > - projection (Schema(a: Int32, b: Int32) -> Schema(a: Int32)) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (BEAM-5959) Add Cloud KMS support to GCS copies
[ https://issues.apache.org/jira/browse/BEAM-5959?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Udi Meiri updated BEAM-5959: Description: Beam SDK currently uses the CopyTo GCS API call, which doesn't support copying objects that Customer Managed Encryption Keys (CMEK). CMEKs are managed in Cloud KMS. Items (for Java and Python SDKs): - Update clients to versions that support KMS keys. - Change copyTo API calls to use rewriteTo (Python - directly, Java - possibly convert copyTo API call to use client library) - Add unit tests. - Add basic tests (DirectRunner and GCS buckets with CMEK). was:For Java and Python SDKs. > Add Cloud KMS support to GCS copies > --- > > Key: BEAM-5959 > URL: https://issues.apache.org/jira/browse/BEAM-5959 > Project: Beam > Issue Type: Bug > Components: io-java-gcp, sdk-py-core >Reporter: Udi Meiri >Assignee: Udi Meiri >Priority: Major > Time Spent: 0.5h > Remaining Estimate: 0h > > Beam SDK currently uses the CopyTo GCS API call, which doesn't support > copying objects that Customer Managed Encryption Keys (CMEK). > CMEKs are managed in Cloud KMS. > Items (for Java and Python SDKs): > - Update clients to versions that support KMS keys. > - Change copyTo API calls to use rewriteTo (Python - directly, Java - > possibly convert copyTo API call to use client library) > - Add unit tests. > - Add basic tests (DirectRunner and GCS buckets with CMEK). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5315) Finish Python 3 porting for io module
[ https://issues.apache.org/jira/browse/BEAM-5315?focusedWorklogId=162635=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162635 ] ASF GitHub Bot logged work on BEAM-5315: Author: ASF GitHub Bot Created on: 05/Nov/18 18:02 Start Date: 05/Nov/18 18:02 Worklog Time Spent: 10m Work Description: tvalentyn commented on a change in pull request #6925: [BEAM-5315] Partially port IO: avro schema parsing and codecs URL: https://github.com/apache/beam/pull/6925#discussion_r230851836 ## File path: sdks/python/apache_beam/examples/avro_bitcoin.py ## @@ -38,7 +38,9 @@ # pylint: disable=wrong-import-order, wrong-import-position try: - from avro.schema import Parse # avro-python3 library for python3 + from avro.schema import Parse as avro_parser # avro-python3 library for python3 + Parse = lambda x: avro_parser(x.decode("utf-8")) \ Review comment: We tried to address comparing types in https://github.com/apache/beam/pull/6602, but had to roll it back due to https://issues.apache.org/jira/browse/BEAM-5744. Could you take a look at https://issues.apache.org/jira/browse/BEAM-5744? If we fix that, I think we could move forward with https://github.com/apache/beam/pull/6602. See also: https://issues.apache.org/jira/browse/BEAM-5621?focusedCommentId=16649857=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16649857 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 162635) Time Spent: 5h 40m (was: 5.5h) > Finish Python 3 porting for io module > - > > Key: BEAM-5315 > URL: https://issues.apache.org/jira/browse/BEAM-5315 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core >Reporter: Robbe >Assignee: Simon >Priority: Major > Time Spent: 5h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (BEAM-5917) Update Flink Runner to 1.5.5
[ https://issues.apache.org/jira/browse/BEAM-5917?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels closed BEAM-5917. Resolution: Fixed > Update Flink Runner to 1.5.5 > > > Key: BEAM-5917 > URL: https://issues.apache.org/jira/browse/BEAM-5917 > Project: Beam > Issue Type: Improvement > Components: runner-flink >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Minor > Fix For: 2.9.0 > > Time Spent: 1.5h > Remaining Estimate: 0h > > We should be on the latest hotfix version to ensure stability for users: > https://flink.apache.org/news/2018/10/29/release-1.5.5.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5910) FileSystems should retrieve lastModified time
[ https://issues.apache.org/jira/browse/BEAM-5910?focusedWorklogId=162633=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162633 ] ASF GitHub Bot logged work on BEAM-5910: Author: ASF GitHub Bot Created on: 05/Nov/18 18:00 Start Date: 05/Nov/18 18:00 Worklog Time Spent: 10m Work Description: jklukas commented on issue #6914: [BEAM-5910] Add lastModified field to MatchResult.Metadata URL: https://github.com/apache/beam/pull/6914#issuecomment-435972592 Thanks for reviewing, @iemejia. I'm very interested to see what comes out of the discussion on evolving coders. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 162633) Time Spent: 1h (was: 50m) > FileSystems should retrieve lastModified time > - > > Key: BEAM-5910 > URL: https://issues.apache.org/jira/browse/BEAM-5910 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core >Reporter: Jeff Klukas >Assignee: Jeff Klukas >Priority: Minor > Time Spent: 1h > Remaining Estimate: 0h > > In the Java SDK, the Filesystems.match facilities are aimed at listing file > names and collect very limited additional metadata from the filesystem > (sizeBytes and isReadSeekEfficient). I propose adding a new field for > lastModified time to MatchResult.Metadata that each FileSystem would populate > when listing files. > This would be a basis for a future improvement to > FileIO.match(...).continuously(...) where we could let the user opt to poll > not just for new file names, but also for existing file names if their > content has been updated. > In the near term, the addition of lastModified to Metadata would allow users > to implement their own polling logic on top of Filesystems.match to detect > and download new files from any of the supported filesystems. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5917) Update Flink Runner to 1.5.5
[ https://issues.apache.org/jira/browse/BEAM-5917?focusedWorklogId=162634=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162634 ] ASF GitHub Bot logged work on BEAM-5917: Author: ASF GitHub Bot Created on: 05/Nov/18 18:00 Start Date: 05/Nov/18 18:00 Worklog Time Spent: 10m Work Description: mxm closed pull request #6885: [BEAM-5917] Update Flink to 1.5.5 URL: https://github.com/apache/beam/pull/6885 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/runners/flink/build.gradle b/runners/flink/build.gradle index ecf66d47ec4..65cb0c39ccb 100644 --- a/runners/flink/build.gradle +++ b/runners/flink/build.gradle @@ -46,7 +46,7 @@ configurations { validatesRunner } -def flink_version = "1.5.3" +def flink_version = "1.5.5" dependencies { compile library.java.guava This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 162634) Time Spent: 1.5h (was: 1h 20m) > Update Flink Runner to 1.5.5 > > > Key: BEAM-5917 > URL: https://issues.apache.org/jira/browse/BEAM-5917 > Project: Beam > Issue Type: Improvement > Components: runner-flink >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Minor > Fix For: 2.9.0 > > Time Spent: 1.5h > Remaining Estimate: 0h > > We should be on the latest hotfix version to ensure stability for users: > https://flink.apache.org/news/2018/10/29/release-1.5.5.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5464) Portable beam hangs while running TFX preprocessing step on a distributed cluster
[ https://issues.apache.org/jira/browse/BEAM-5464?focusedWorklogId=162614=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162614 ] ASF GitHub Bot logged work on BEAM-5464: Author: ASF GitHub Bot Created on: 05/Nov/18 17:45 Start Date: 05/Nov/18 17:45 Worklog Time Spent: 10m Work Description: mxm commented on issue #6897: [BEAM-5464] Use BATCH_FORCED as the default ExecutionMode for batch pipeline URL: https://github.com/apache/beam/pull/6897#issuecomment-435967569 > Just to clarify, the resulting PR just enables the option, rather than setting it as the title states, right? Yes, it just enables to set it. The default remains unchanged (`PIPELINED`). We didn't want to make changes to the legacy FlinkRunner and further investigate the issue. As a next step we could set `BATCH_FORCED` for the portable Runner. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 162614) Time Spent: 2h 10m (was: 2h) > Portable beam hangs while running TFX preprocessing step on a distributed > cluster > - > > Key: BEAM-5464 > URL: https://issues.apache.org/jira/browse/BEAM-5464 > Project: Beam > Issue Type: Bug > Components: java-fn-execution >Reporter: Axel Magnuson >Assignee: Ankur Goenka >Priority: Major > Time Spent: 2h 10m > Remaining Estimate: 0h > > Recently I went through the exercise of running the TFX taxi example on a > dataproc cluster. However it would always hang indefinitely. The flink UI > indicated that the job was halfway done. However I could not see any clear > errors in the job driver logs, the job service logs, or the Flink logs. The > root cause is still a mystery to me. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5918) Add Cast transform for Rows
[ https://issues.apache.org/jira/browse/BEAM-5918?focusedWorklogId=162615=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162615 ] ASF GitHub Bot logged work on BEAM-5918: Author: ASF GitHub Bot Created on: 05/Nov/18 17:46 Start Date: 05/Nov/18 17:46 Worklog Time Spent: 10m Work Description: kanterov commented on issue #6888: [BEAM-5918] Add Cast transform for Rows URL: https://github.com/apache/beam/pull/6888#issuecomment-435967943 @kennknowles Gentle ping, or, perhaps, there is somebody else who can help with the review? @akedin This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 162615) Time Spent: 3h 40m (was: 3.5h) > Add Cast transform for Rows > --- > > Key: BEAM-5918 > URL: https://issues.apache.org/jira/browse/BEAM-5918 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core >Reporter: Gleb Kanterov >Assignee: Gleb Kanterov >Priority: Major > Time Spent: 3h 40m > Remaining Estimate: 0h > > There is a need for a generic transform that given two Row schemas will > convert rows between them. There must be a possibility to opt-out from > certain kind of conversions, for instance, converting ints to shorts can > cause overflow. Another example, a schema could have a nullable field, but > never have NULL value in practice, because it was filtered out. > What is needed: > - widening values (e.g., int -> long) > - narrowwing (e.g., int -> short) > - runtime check for overflow while narrowing > - ignoring nullability (nullable=true -> nullable=false) > - weakening nullability (nullable=false -> nullable=true) > - projection (Schema(a: Int32, b: Int32) -> Schema(a: Int32)) -- This message was sent by Atlassian JIRA (v7.6.3#76005)