[jira] [Resolved] (BEAM-3247) Sample.any memory constraint
[ https://issues.apache.org/jira/browse/BEAM-3247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neville Li resolved BEAM-3247. -- Resolution: Fixed Fix Version/s: 2.2.0 > Sample.any memory constraint > > > Key: BEAM-3247 > URL: https://issues.apache.org/jira/browse/BEAM-3247 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core >Affects Versions: 2.1.0 >Reporter: Neville Li >Assignee: Neville Li >Priority: Minor > Fix For: 2.2.0 > > > Right now {{Sample.any}} converts the collection to an iterable view and take > first n in a side input. This may require materializing the entire collection > to disk and is potentially inefficient. > https://github.com/apache/beam/blob/v2.1.0/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java#L74 > It can be fixed by applying a truncating `DoFn` first, then a combine into > `List` which limits the list size, and finally flattening the list. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-5036) Optimize FileBasedSink's WriteOperation.moveToOutput()
[ https://issues.apache.org/jira/browse/BEAM-5036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16596565#comment-16596565 ] Neville Li commented on BEAM-5036: -- Yeah that's why I figured. So there's no way to reduce this overhead on GCS unless if GCS starts to support efficient object {{rename}}. > Optimize FileBasedSink's WriteOperation.moveToOutput() > -- > > Key: BEAM-5036 > URL: https://issues.apache.org/jira/browse/BEAM-5036 > Project: Beam > Issue Type: Improvement > Components: io-java-files >Affects Versions: 2.5.0 >Reporter: Jozef Vilcek >Assignee: Tim Robertson >Priority: Major > Time Spent: 1.5h > Remaining Estimate: 0h > > moveToOutput() methods in FileBasedSink.WriteOperation implements move by > copy+delete. It would be better to use a rename() which can be much more > effective for some filesystems. > Filesystem must support cross-directory rename. BEAM-4861 is related to this > for the case of HDFS filesystem. > Feature was discussed here: > http://mail-archives.apache.org/mod_mbox/beam-dev/201807.mbox/%3CCAF9t7_4Mp54pQ+vRrJrBh9Vx0=uaknupzd_qdh_qdm9vxll...@mail.gmail.com%3E -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (BEAM-5036) Optimize FileBasedSink's WriteOperation.moveToOutput()
[ https://issues.apache.org/jira/browse/BEAM-5036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16596565#comment-16596565 ] Neville Li edited comment on BEAM-5036 at 8/29/18 4:23 PM: --- Yeah that's what I figured. So there's no way to reduce this overhead on GCS unless if GCS starts to support efficient object {{rename}}. was (Author: sinisa_lyh): Yeah that's why I figured. So there's no way to reduce this overhead on GCS unless if GCS starts to support efficient object {{rename}}. > Optimize FileBasedSink's WriteOperation.moveToOutput() > -- > > Key: BEAM-5036 > URL: https://issues.apache.org/jira/browse/BEAM-5036 > Project: Beam > Issue Type: Improvement > Components: io-java-files >Affects Versions: 2.5.0 >Reporter: Jozef Vilcek >Assignee: Tim Robertson >Priority: Major > Time Spent: 1.5h > Remaining Estimate: 0h > > moveToOutput() methods in FileBasedSink.WriteOperation implements move by > copy+delete. It would be better to use a rename() which can be much more > effective for some filesystems. > Filesystem must support cross-directory rename. BEAM-4861 is related to this > for the case of HDFS filesystem. > Feature was discussed here: > http://mail-archives.apache.org/mod_mbox/beam-dev/201807.mbox/%3CCAF9t7_4Mp54pQ+vRrJrBh9Vx0=uaknupzd_qdh_qdm9vxll...@mail.gmail.com%3E -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-5036) Optimize FileBasedSink's WriteOperation.moveToOutput()
[ https://issues.apache.org/jira/browse/BEAM-5036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16596513#comment-16596513 ] Neville Li commented on BEAM-5036: -- {{copy+delete}} is still expensive on GCS, especially when running 10Ks of jobs writing TBs of data daily. My memory is a bit vague, but was there a time when {{AvroIO}} wrote to output files directly without a {{rename}} or {{copy+delete}}? > Optimize FileBasedSink's WriteOperation.moveToOutput() > -- > > Key: BEAM-5036 > URL: https://issues.apache.org/jira/browse/BEAM-5036 > Project: Beam > Issue Type: Improvement > Components: io-java-files >Affects Versions: 2.5.0 >Reporter: Jozef Vilcek >Assignee: Tim Robertson >Priority: Major > Time Spent: 1.5h > Remaining Estimate: 0h > > moveToOutput() methods in FileBasedSink.WriteOperation implements move by > copy+delete. It would be better to use a rename() which can be much more > effective for some filesystems. > Filesystem must support cross-directory rename. BEAM-4861 is related to this > for the case of HDFS filesystem. > Feature was discussed here: > http://mail-archives.apache.org/mod_mbox/beam-dev/201807.mbox/%3CCAF9t7_4Mp54pQ+vRrJrBh9Vx0=uaknupzd_qdh_qdm9vxll...@mail.gmail.com%3E -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-5036) Optimize FileBasedSink's WriteOperation.moveToOutput()
[ https://issues.apache.org/jira/browse/BEAM-5036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16596410#comment-16596410 ] Neville Li commented on BEAM-5036: -- Yeah that's my main concern. We use GCS almost exclusively so all our jobs are affected by this. > Optimize FileBasedSink's WriteOperation.moveToOutput() > -- > > Key: BEAM-5036 > URL: https://issues.apache.org/jira/browse/BEAM-5036 > Project: Beam > Issue Type: Improvement > Components: io-java-files >Affects Versions: 2.5.0 >Reporter: Jozef Vilcek >Assignee: Tim Robertson >Priority: Major > Time Spent: 1.5h > Remaining Estimate: 0h > > moveToOutput() methods in FileBasedSink.WriteOperation implements move by > copy+delete. It would be better to use a rename() which can be much more > effective for some filesystems. > Filesystem must support cross-directory rename. BEAM-4861 is related to this > for the case of HDFS filesystem. > Feature was discussed here: > http://mail-archives.apache.org/mod_mbox/beam-dev/201807.mbox/%3CCAF9t7_4Mp54pQ+vRrJrBh9Vx0=uaknupzd_qdh_qdm9vxll...@mail.gmail.com%3E -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-5036) Optimize FileBasedSink's WriteOperation.moveToOutput()
[ https://issues.apache.org/jira/browse/BEAM-5036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16596392#comment-16596392 ] Neville Li commented on BEAM-5036: -- If I understand this correctly, this issue affects all file based IOs, including Avro? We have a lot of jobs with huge Avro outputs. > Optimize FileBasedSink's WriteOperation.moveToOutput() > -- > > Key: BEAM-5036 > URL: https://issues.apache.org/jira/browse/BEAM-5036 > Project: Beam > Issue Type: Improvement > Components: io-java-files >Affects Versions: 2.5.0 >Reporter: Jozef Vilcek >Assignee: Tim Robertson >Priority: Major > Time Spent: 1.5h > Remaining Estimate: 0h > > moveToOutput() methods in FileBasedSink.WriteOperation implements move by > copy+delete. It would be better to use a rename() which can be much more > effective for some filesystems. > Filesystem must support cross-directory rename. BEAM-4861 is related to this > for the case of HDFS filesystem. > Feature was discussed here: > http://mail-archives.apache.org/mod_mbox/beam-dev/201807.mbox/%3CCAF9t7_4Mp54pQ+vRrJrBh9Vx0=uaknupzd_qdh_qdm9vxll...@mail.gmail.com%3E -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-3234) PubsubIO batch size should be configurable
[ https://issues.apache.org/jira/browse/BEAM-3234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16275383#comment-16275383 ] Neville Li commented on BEAM-3234: -- Affects 2.2.0 as well. https://github.com/apache/beam/blob/release-2.2.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L873 > PubsubIO batch size should be configurable > -- > > Key: BEAM-3234 > URL: https://issues.apache.org/jira/browse/BEAM-3234 > Project: Beam > Issue Type: Bug > Components: sdk-java-gcp >Affects Versions: 2.1.0, 2.2.0 >Reporter: Neville Li >Priority: Minor > > Looks like there's a payload size limit in Pubsub, and PubsubIO has a hard > coded batch size that may cause this limit to be exceeded in some cases. > https://github.com/apache/beam/blob/release-2.1.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L885 > {code} > import org.apache.beam.sdk.Pipeline; > import org.apache.beam.sdk.io.GenerateSequence; > import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; > import org.apache.beam.sdk.options.PipelineOptions; > import org.apache.beam.sdk.options.PipelineOptionsFactory; > import org.apache.beam.sdk.transforms.MapElements; > import org.apache.beam.sdk.values.TypeDescriptor; > public class Test { > public static void main(String[] args) { > PipelineOptions options = PipelineOptionsFactory.create(); > Pipeline pipeline = Pipeline.create(options); > pipeline > .apply(GenerateSequence.from(0).to(100)) > .apply(MapElements > .into(TypeDescriptor.of(String.class)) > .via(x -> { > StringBuilder b = new StringBuilder(); > for (int i = 0; i < 1000; i++) { > b.append("x"); > } > return b.toString(); > })) > .apply(PubsubIO > .writeStrings() > .to("projects/scio-playground/topics/payload-test")); > pipeline.run().waitUntilFinish(); > } > } > {code} > The above code throws the following error: > {code} > [error] Caused by: > com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad > Request > [error] { > [error] "code" : 400, > [error] "errors" : [ { > [error] "domain" : "global", > [error] "message" : "Request payload size exceeds the limit: 10485760 > bytes.", > [error] "reason" : "badRequest" > [error] } ], > [error] "message" : "Request payload size exceeds the limit: 10485760 > bytes.", > [error] "status" : "INVALID_ARGUMENT" > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (BEAM-3234) PubsubIO batch size should be configurable
[ https://issues.apache.org/jira/browse/BEAM-3234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neville Li updated BEAM-3234: - Affects Version/s: 2.2.0 > PubsubIO batch size should be configurable > -- > > Key: BEAM-3234 > URL: https://issues.apache.org/jira/browse/BEAM-3234 > Project: Beam > Issue Type: Bug > Components: sdk-java-gcp >Affects Versions: 2.1.0, 2.2.0 >Reporter: Neville Li >Priority: Minor > > Looks like there's a payload size limit in Pubsub, and PubsubIO has a hard > coded batch size that may cause this limit to be exceeded in some cases. > https://github.com/apache/beam/blob/release-2.1.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L885 > {code} > import org.apache.beam.sdk.Pipeline; > import org.apache.beam.sdk.io.GenerateSequence; > import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; > import org.apache.beam.sdk.options.PipelineOptions; > import org.apache.beam.sdk.options.PipelineOptionsFactory; > import org.apache.beam.sdk.transforms.MapElements; > import org.apache.beam.sdk.values.TypeDescriptor; > public class Test { > public static void main(String[] args) { > PipelineOptions options = PipelineOptionsFactory.create(); > Pipeline pipeline = Pipeline.create(options); > pipeline > .apply(GenerateSequence.from(0).to(100)) > .apply(MapElements > .into(TypeDescriptor.of(String.class)) > .via(x -> { > StringBuilder b = new StringBuilder(); > for (int i = 0; i < 1000; i++) { > b.append("x"); > } > return b.toString(); > })) > .apply(PubsubIO > .writeStrings() > .to("projects/scio-playground/topics/payload-test")); > pipeline.run().waitUntilFinish(); > } > } > {code} > The above code throws the following error: > {code} > [error] Caused by: > com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad > Request > [error] { > [error] "code" : 400, > [error] "errors" : [ { > [error] "domain" : "global", > [error] "message" : "Request payload size exceeds the limit: 10485760 > bytes.", > [error] "reason" : "badRequest" > [error] } ], > [error] "message" : "Request payload size exceeds the limit: 10485760 > bytes.", > [error] "status" : "INVALID_ARGUMENT" > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (BEAM-991) DatastoreIO Write should flush early for large batches
[ https://issues.apache.org/jira/browse/BEAM-991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neville Li updated BEAM-991: Fix Version/s: 2.1.0 > DatastoreIO Write should flush early for large batches > -- > > Key: BEAM-991 > URL: https://issues.apache.org/jira/browse/BEAM-991 > Project: Beam > Issue Type: Bug > Components: sdk-java-gcp >Reporter: Vikas Kedigehalli >Assignee: Vikas Kedigehalli > Fix For: 2.1.0 > > > If entities are large (avg size > 20KB) then the a single batched write (500 > entities) would exceed the Datastore size limit of a single request (10MB) > from https://cloud.google.com/datastore/docs/concepts/limits. > First reported in: > http://stackoverflow.com/questions/40156400/why-does-dataflow-erratically-fail-in-datastore-access -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (BEAM-3247) Sample.any memory constraint
Neville Li created BEAM-3247: Summary: Sample.any memory constraint Key: BEAM-3247 URL: https://issues.apache.org/jira/browse/BEAM-3247 Project: Beam Issue Type: Improvement Components: sdk-java-core Affects Versions: 2.1.0 Reporter: Neville Li Assignee: Kenneth Knowles Priority: Minor Right now {{Sample.any}} converts the collection to an iterable view and take first n in a side input. This may require materializing the entire collection to disk and is potentially inefficient. https://github.com/apache/beam/blob/v2.1.0/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java#L74 It can be fixed by applying a truncating `DoFn` first, then a combine into `List` which limits the list size, and finally flattening the list. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (BEAM-3247) Sample.any memory constraint
[ https://issues.apache.org/jira/browse/BEAM-3247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neville Li reassigned BEAM-3247: Assignee: Neville Li (was: Kenneth Knowles) > Sample.any memory constraint > > > Key: BEAM-3247 > URL: https://issues.apache.org/jira/browse/BEAM-3247 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core >Affects Versions: 2.1.0 >Reporter: Neville Li >Assignee: Neville Li >Priority: Minor > > Right now {{Sample.any}} converts the collection to an iterable view and take > first n in a side input. This may require materializing the entire collection > to disk and is potentially inefficient. > https://github.com/apache/beam/blob/v2.1.0/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java#L74 > It can be fixed by applying a truncating `DoFn` first, then a combine into > `List` which limits the list size, and finally flattening the list. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (BEAM-3234) PubsubIO batch size should be configurable
[ https://issues.apache.org/jira/browse/BEAM-3234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neville Li updated BEAM-3234: - Description: Looks like there's a payload size limit in Pubsub, and PubsubIO has a hard coded batch size that may cause this limit to be exceeded in some cases. https://github.com/apache/beam/blob/release-2.1.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L885 {code} import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.values.TypeDescriptor; public class Test { public static void main(String[] args) { PipelineOptions options = PipelineOptionsFactory.create(); Pipeline pipeline = Pipeline.create(options); pipeline .apply(GenerateSequence.from(0).to(100)) .apply(MapElements .into(TypeDescriptor.of(String.class)) .via(x -> { StringBuilder b = new StringBuilder(); for (int i = 0; i < 1000; i++) { b.append("x"); } return b.toString(); })) .apply(PubsubIO .writeStrings() .to("projects/scio-playground/topics/payload-test")); pipeline.run().waitUntilFinish(); } } {code} The above code throws the following error: {code} [error] Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad Request [error] { [error] "code" : 400, [error] "errors" : [ { [error] "domain" : "global", [error] "message" : "Request payload size exceeds the limit: 10485760 bytes.", [error] "reason" : "badRequest" [error] } ], [error] "message" : "Request payload size exceeds the limit: 10485760 bytes.", [error] "status" : "INVALID_ARGUMENT" {code} was: Looks like there's a payload size limit in Pubsub, and PubsubIO has a hard coded batch size that may cause this limit to be exceeded in some cases. {{{ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.values.TypeDescriptor; public class Test { public static void main(String[] args) { PipelineOptions options = PipelineOptionsFactory.create(); Pipeline pipeline = Pipeline.create(options); pipeline .apply(GenerateSequence.from(0).to(100)) .apply(MapElements .into(TypeDescriptor.of(String.class)) .via(x -> { StringBuilder b = new StringBuilder(); for (int i = 0; i < 1000; i++) { b.append("x"); } return b.toString(); })) .apply(PubsubIO .writeStrings() .to("projects/scio-playground/topics/payload-test")); pipeline.run().waitUntilFinish(); } } }}} The above code throws the following error: {{{ [error] Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad Request [error] { [error] "code" : 400, [error] "errors" : [ { [error] "domain" : "global", [error] "message" : "Request payload size exceeds the limit: 10485760 bytes.", [error] "reason" : "badRequest" [error] } ], [error] "message" : "Request payload size exceeds the limit: 10485760 bytes.", [error] "status" : "INVALID_ARGUMENT" }}} > PubsubIO batch size should be configurable > -- > > Key: BEAM-3234 > URL: https://issues.apache.org/jira/browse/BEAM-3234 > Project: Beam > Issue Type: Bug > Components: sdk-java-gcp >Affects Versions: 2.1.0 >Reporter: Neville Li >Priority: Minor > > Looks like there's a payload size limit in Pubsub, and PubsubIO has a hard > coded batch size that may cause this limit to be exceeded in some cases. > https://github.com/apache/beam/blob/release-2.1.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L885 > {code} > import org.apache.beam.sdk.Pipeline; > import org.apache.beam.sdk.io.GenerateSequence; > import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; > import org.apache.beam.sdk.options.PipelineOptions; > import org.apache.beam.sdk.options.PipelineOptionsFactory; > import org.apache.beam.sdk.transforms.MapElements; > import org.apache.beam.sdk.values.TypeDescriptor; > public class Test { > public static void main(String[] args) { > PipelineOptions options = PipelineOptionsFactory.crea
[jira] [Updated] (BEAM-3234) PubsubIO batch size should be configurable
[ https://issues.apache.org/jira/browse/BEAM-3234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neville Li updated BEAM-3234: - Description: Looks like there's a payload size limit in Pubsub, and PubsubIO has a hard coded batch size that may cause this limit to be exceeded in some cases. {{{ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.values.TypeDescriptor; public class Test { public static void main(String[] args) { PipelineOptions options = PipelineOptionsFactory.create(); Pipeline pipeline = Pipeline.create(options); pipeline .apply(GenerateSequence.from(0).to(100)) .apply(MapElements .into(TypeDescriptor.of(String.class)) .via(x -> { StringBuilder b = new StringBuilder(); for (int i = 0; i < 1000; i++) { b.append("x"); } return b.toString(); })) .apply(PubsubIO .writeStrings() .to("projects/scio-playground/topics/payload-test")); pipeline.run().waitUntilFinish(); } } }}} The above code throws the following error: {{{ [error] Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad Request [error] { [error] "code" : 400, [error] "errors" : [ { [error] "domain" : "global", [error] "message" : "Request payload size exceeds the limit: 10485760 bytes.", [error] "reason" : "badRequest" [error] } ], [error] "message" : "Request payload size exceeds the limit: 10485760 bytes.", [error] "status" : "INVALID_ARGUMENT" }}} was: Looks like there's a payload size limit in Pubsub, and PubsubIO has a hard coded batch size that may cause this limit to be exceeded in some cases. {{ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.values.TypeDescriptor; public class Test { public static void main(String[] args) { PipelineOptions options = PipelineOptionsFactory.create(); Pipeline pipeline = Pipeline.create(options); pipeline .apply(GenerateSequence.from(0).to(100)) .apply(MapElements .into(TypeDescriptor.of(String.class)) .via(x -> { StringBuilder b = new StringBuilder(); for (int i = 0; i < 1000; i++) { b.append("x"); } return b.toString(); })) .apply(PubsubIO .writeStrings() .to("projects/scio-playground/topics/payload-test")); pipeline.run().waitUntilFinish(); } } }} The above code throws the following error: {{ [error] Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad Request [error] { [error] "code" : 400, [error] "errors" : [ { [error] "domain" : "global", [error] "message" : "Request payload size exceeds the limit: 10485760 bytes.", [error] "reason" : "badRequest" [error] } ], [error] "message" : "Request payload size exceeds the limit: 10485760 bytes.", [error] "status" : "INVALID_ARGUMENT" }} > PubsubIO batch size should be configurable > -- > > Key: BEAM-3234 > URL: https://issues.apache.org/jira/browse/BEAM-3234 > Project: Beam > Issue Type: Bug > Components: sdk-java-gcp >Affects Versions: 2.1.0 >Reporter: Neville Li >Priority: Minor > > Looks like there's a payload size limit in Pubsub, and PubsubIO has a hard > coded batch size that may cause this limit to be exceeded in some cases. > {{{ > import org.apache.beam.sdk.Pipeline; > import org.apache.beam.sdk.io.GenerateSequence; > import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; > import org.apache.beam.sdk.options.PipelineOptions; > import org.apache.beam.sdk.options.PipelineOptionsFactory; > import org.apache.beam.sdk.transforms.MapElements; > import org.apache.beam.sdk.values.TypeDescriptor; > public class Test { > public static void main(String[] args) { > PipelineOptions options = PipelineOptionsFactory.create(); > Pipeline pipeline = Pipeline.create(options); > pipeline > .apply(GenerateSequence.from(0).to(100)) > .apply(MapElements > .into(TypeDescriptor.of(String.class)) > .via(x -> { > StringBuilder b = new StringBuilder(); > for (int i = 0; i < 100
[jira] [Updated] (BEAM-3234) PubsubIO batch size should be configurable
[ https://issues.apache.org/jira/browse/BEAM-3234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neville Li updated BEAM-3234: - Description: Looks like there's a payload size limit in Pubsub, and PubsubIO has a hard coded batch size that may cause this limit to be exceeded in some cases. {{ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.values.TypeDescriptor; public class Test { public static void main(String[] args) { PipelineOptions options = PipelineOptionsFactory.create(); Pipeline pipeline = Pipeline.create(options); pipeline .apply(GenerateSequence.from(0).to(100)) .apply(MapElements .into(TypeDescriptor.of(String.class)) .via(x -> { StringBuilder b = new StringBuilder(); for (int i = 0; i < 1000; i++) { b.append("x"); } return b.toString(); })) .apply(PubsubIO .writeStrings() .to("projects/scio-playground/topics/payload-test")); pipeline.run().waitUntilFinish(); } } }} The above code throws the following error: {{ [error] Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad Request [error] { [error] "code" : 400, [error] "errors" : [ { [error] "domain" : "global", [error] "message" : "Request payload size exceeds the limit: 10485760 bytes.", [error] "reason" : "badRequest" [error] } ], [error] "message" : "Request payload size exceeds the limit: 10485760 bytes.", [error] "status" : "INVALID_ARGUMENT" }} was: Looks like there's a payload size limit in Pubsub, and PubsubIO has a hard coded batch size that may cause this limit to be exceeded in some cases. {{import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.values.TypeDescriptor; public class Test { public static void main(String[] args) { PipelineOptions options = PipelineOptionsFactory.create(); Pipeline pipeline = Pipeline.create(options); pipeline .apply(GenerateSequence.from(0).to(100)) .apply(MapElements .into(TypeDescriptor.of(String.class)) .via(x -> { StringBuilder b = new StringBuilder(); for (int i = 0; i < 1000; i++) { b.append("x"); } return b.toString(); })) .apply(PubsubIO .writeStrings() .to("projects/scio-playground/topics/payload-test")); pipeline.run().waitUntilFinish(); } } }} The above code throws the following error: {{ [error] Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad Request [error] { [error] "code" : 400, [error] "errors" : [ { [error] "domain" : "global", [error] "message" : "Request payload size exceeds the limit: 10485760 bytes.", [error] "reason" : "badRequest" [error] } ], [error] "message" : "Request payload size exceeds the limit: 10485760 bytes.", [error] "status" : "INVALID_ARGUMENT" }} > PubsubIO batch size should be configurable > -- > > Key: BEAM-3234 > URL: https://issues.apache.org/jira/browse/BEAM-3234 > Project: Beam > Issue Type: Bug > Components: sdk-java-gcp >Affects Versions: 2.1.0 >Reporter: Neville Li >Priority: Minor > > Looks like there's a payload size limit in Pubsub, and PubsubIO has a hard > coded batch size that may cause this limit to be exceeded in some cases. > {{ > import org.apache.beam.sdk.Pipeline; > import org.apache.beam.sdk.io.GenerateSequence; > import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; > import org.apache.beam.sdk.options.PipelineOptions; > import org.apache.beam.sdk.options.PipelineOptionsFactory; > import org.apache.beam.sdk.transforms.MapElements; > import org.apache.beam.sdk.values.TypeDescriptor; > public class Test { > public static void main(String[] args) { > PipelineOptions options = PipelineOptionsFactory.create(); > Pipeline pipeline = Pipeline.create(options); > pipeline > .apply(GenerateSequence.from(0).to(100)) > .apply(MapElements > .into(TypeDescriptor.of(String.class)) > .via(x -> { > StringBuilder b = new StringBuilder(); > for (int i = 0; i < 1000;
[jira] [Created] (BEAM-3234) PubsubIO batch size should be configurable
Neville Li created BEAM-3234: Summary: PubsubIO batch size should be configurable Key: BEAM-3234 URL: https://issues.apache.org/jira/browse/BEAM-3234 Project: Beam Issue Type: Bug Components: sdk-java-gcp Affects Versions: 2.1.0 Reporter: Neville Li Priority: Minor Looks like there's a payload size limit in Pubsub, and PubsubIO has a hard coded batch size that may cause this limit to be exceeded in some cases. {{import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.values.TypeDescriptor; public class Test { public static void main(String[] args) { PipelineOptions options = PipelineOptionsFactory.create(); Pipeline pipeline = Pipeline.create(options); pipeline .apply(GenerateSequence.from(0).to(100)) .apply(MapElements .into(TypeDescriptor.of(String.class)) .via(x -> { StringBuilder b = new StringBuilder(); for (int i = 0; i < 1000; i++) { b.append("x"); } return b.toString(); })) .apply(PubsubIO .writeStrings() .to("projects/scio-playground/topics/payload-test")); pipeline.run().waitUntilFinish(); } } }} The above code throws the following error: {{ [error] Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad Request [error] { [error] "code" : 400, [error] "errors" : [ { [error] "domain" : "global", [error] "message" : "Request payload size exceeds the limit: 10485760 bytes.", [error] "reason" : "badRequest" [error] } ], [error] "message" : "Request payload size exceeds the limit: 10485760 bytes.", [error] "status" : "INVALID_ARGUMENT" }} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (BEAM-2960) Missing type parameter in some AvroIO.Write API
[ https://issues.apache.org/jira/browse/BEAM-2960?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neville Li reassigned BEAM-2960: Assignee: Neville Li (was: Kenneth Knowles) > Missing type parameter in some AvroIO.Write API > --- > > Key: BEAM-2960 > URL: https://issues.apache.org/jira/browse/BEAM-2960 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Affects Versions: 2.1.0 >Reporter: Neville Li >Assignee: Neville Li >Priority: Minor > > Like > {{public Write to(DynamicAvroDestinations dynamicDestinations)}} > {{public Write withSchema(Schema schema)}} > {{public Write withWindowedWrites()}} > {{public Write withMetadata(Map metadata)}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (BEAM-2960) Missing type parameter in some AvroIO.Write API
Neville Li created BEAM-2960: Summary: Missing type parameter in some AvroIO.Write API Key: BEAM-2960 URL: https://issues.apache.org/jira/browse/BEAM-2960 Project: Beam Issue Type: Bug Components: sdk-java-core Affects Versions: 2.1.0 Reporter: Neville Li Assignee: Kenneth Knowles Priority: Minor Like {{public Write to(DynamicAvroDestinations dynamicDestinations)}} {{public Write withSchema(Schema schema)}} {{public Write withWindowedWrites()}} {{public Write withMetadata(Map metadata)}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (BEAM-2766) HadoopInputFormatIO should support Void/null key/values
Neville Li created BEAM-2766: Summary: HadoopInputFormatIO should support Void/null key/values Key: BEAM-2766 URL: https://issues.apache.org/jira/browse/BEAM-2766 Project: Beam Issue Type: Bug Components: sdk-java-extensions Affects Versions: 2.2.0 Reporter: Neville Li Assignee: Reuven Lax Priority: Minor Many Hadoop {{InputFormat}} implementations use {{Void}} as key/value type and generates null values which causes {{NullPointerException}} in https://github.com/apache/beam/blob/master/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java#L714 {{HadoopInputFormatIO}} should ignore these and not clone them. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (BEAM-2765) HadoopInputFormatIO should support custom key/value coder
Neville Li created BEAM-2765: Summary: HadoopInputFormatIO should support custom key/value coder Key: BEAM-2765 URL: https://issues.apache.org/jira/browse/BEAM-2765 Project: Beam Issue Type: Improvement Components: sdk-java-extensions Affects Versions: 2.2.0 Reporter: Neville Li Assignee: Reuven Lax Priority: Minor Right now {{HadoopInputFormatIO}} infers coders with {{getDefaultCoder}} but this doesn't work for cases like {{AvroCoder}} with {{GenericRecord}}, for example, when Avro is used with Parquet. https://github.com/apache/beam/blob/master/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java#L288 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-2658) SerializableCoder has higher precedence over ProtoCoder in CoderRegistry#getCoder
[ https://issues.apache.org/jira/browse/BEAM-2658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16097699#comment-16097699 ] Neville Li commented on BEAM-2658: -- However I'd still argue that {{DefaultCoder}} and {{SerializableCoder}} are 2 special cases. {{DefaultCoder}} should have the highest precedence and overrides typed based lookup while {{SerializableCoder}} should have the lowest as a fallback. Other coders in between don't really overlap. > SerializableCoder has higher precedence over ProtoCoder in > CoderRegistry#getCoder > - > > Key: BEAM-2658 > URL: https://issues.apache.org/jira/browse/BEAM-2658 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Affects Versions: 2.0.0 >Reporter: Neville Li >Assignee: Davor Bonaci >Priority: Minor > > {code} > import com.google.protobuf.Timestamp; > import org.apache.beam.sdk.Pipeline; > import org.apache.beam.sdk.coders.CannotProvideCoderException; > import org.apache.beam.sdk.coders.Coder; > import org.apache.beam.sdk.options.PipelineOptions; > import org.apache.beam.sdk.options.PipelineOptionsFactory; > public class CoderTest { > public static void main(String[] args) throws CannotProvideCoderException { > PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create(); > Pipeline pipeline = Pipeline.create(options); > Coder coder = > pipeline.getCoderRegistry().getCoder(Timestamp.class); > // class org.apache.beam.sdk.coders.SerializableCoder > System.out.println(coder.getClass()); > } > } > {code} > Right now we're sorting {{CoderProviderRegistrar}}s by canonical name but > {{SerializableCoderProvider}} should be added last as a fallback if there're > other {{CoderProvider}}s that support the same type. > {code} > Set registrars = > Sets.newTreeSet(ObjectsClassComparator.INSTANCE); > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-2658) SerializableCoder has higher precedence over ProtoCoder in CoderRegistry#getCoder
[ https://issues.apache.org/jira/browse/BEAM-2658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16097693#comment-16097693 ] Neville Li commented on BEAM-2658: -- Types covered by each {{CoderProvider}} may overlap and we might want sort them based on scope and not name. 2 ideas: 1. Add a `List getSupportedTypes()` and make sure that when 2 providers overlap, the one with the narrower supported types gets precedence. Won't work with `DefaultCoder` though. Also a class can implement multiple interfaces and it doesn't specify which one has higher precedence. For example `ProtoCoder` supports `Message.class` while `SerializableCoder` supports `Serializable.class` but generated Protobuf classes also implements `Serializable`. 2. Assign an arbitrary int precedent to each coder, similar to UNIX rc/motd files, and store registered coders in a `TreeMap>`. Not elegant but works. > SerializableCoder has higher precedence over ProtoCoder in > CoderRegistry#getCoder > - > > Key: BEAM-2658 > URL: https://issues.apache.org/jira/browse/BEAM-2658 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Affects Versions: 2.0.0 >Reporter: Neville Li >Assignee: Davor Bonaci >Priority: Minor > > {code} > import com.google.protobuf.Timestamp; > import org.apache.beam.sdk.Pipeline; > import org.apache.beam.sdk.coders.CannotProvideCoderException; > import org.apache.beam.sdk.coders.Coder; > import org.apache.beam.sdk.options.PipelineOptions; > import org.apache.beam.sdk.options.PipelineOptionsFactory; > public class CoderTest { > public static void main(String[] args) throws CannotProvideCoderException { > PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create(); > Pipeline pipeline = Pipeline.create(options); > Coder coder = > pipeline.getCoderRegistry().getCoder(Timestamp.class); > // class org.apache.beam.sdk.coders.SerializableCoder > System.out.println(coder.getClass()); > } > } > {code} > Right now we're sorting {{CoderProviderRegistrar}}s by canonical name but > {{SerializableCoderProvider}} should be added last as a fallback if there're > other {{CoderProvider}}s that support the same type. > {code} > Set registrars = > Sets.newTreeSet(ObjectsClassComparator.INSTANCE); > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (BEAM-2658) SerializableCoder has higher precedence over ProtoCoder in CoderRegistry#getCoder
[ https://issues.apache.org/jira/browse/BEAM-2658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neville Li updated BEAM-2658: - Description: {code} import com.google.protobuf.Timestamp; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; public class CoderTest { public static void main(String[] args) throws CannotProvideCoderException { PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create(); Pipeline pipeline = Pipeline.create(options); Coder coder = pipeline.getCoderRegistry().getCoder(Timestamp.class); // class org.apache.beam.sdk.coders.SerializableCoder System.out.println(coder.getClass()); } } {code} Right now we're sorting {{CoderProviderRegistrar}}s by canonical name but {{SerializableCoderProvider}} should be added last as a fallback if there're other {{CoderProvider}}s that support the same type. {code} Set registrars = Sets.newTreeSet(ObjectsClassComparator.INSTANCE); {code} was: {code} import com.google.protobuf.Timestamp; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; public class CoderTest { public static void main(String[] args) throws CannotProvideCoderException { PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create(); Pipeline pipeline = Pipeline.create(options); Coder coder = pipeline.getCoderRegistry().getCoder(Timestamp.class); // class org.apache.beam.sdk.coders.SerializableCoder System.out.println(coder.getClass()); } } {code} > SerializableCoder has higher precedence over ProtoCoder in > CoderRegistry#getCoder > - > > Key: BEAM-2658 > URL: https://issues.apache.org/jira/browse/BEAM-2658 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Affects Versions: 2.0.0 >Reporter: Neville Li >Assignee: Davor Bonaci >Priority: Minor > > {code} > import com.google.protobuf.Timestamp; > import org.apache.beam.sdk.Pipeline; > import org.apache.beam.sdk.coders.CannotProvideCoderException; > import org.apache.beam.sdk.coders.Coder; > import org.apache.beam.sdk.options.PipelineOptions; > import org.apache.beam.sdk.options.PipelineOptionsFactory; > public class CoderTest { > public static void main(String[] args) throws CannotProvideCoderException { > PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create(); > Pipeline pipeline = Pipeline.create(options); > Coder coder = > pipeline.getCoderRegistry().getCoder(Timestamp.class); > // class org.apache.beam.sdk.coders.SerializableCoder > System.out.println(coder.getClass()); > } > } > {code} > Right now we're sorting {{CoderProviderRegistrar}}s by canonical name but > {{SerializableCoderProvider}} should be added last as a fallback if there're > other {{CoderProvider}}s that support the same type. > {code} > Set registrars = > Sets.newTreeSet(ObjectsClassComparator.INSTANCE); > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (BEAM-2658) SerializableCoder has higher precedence over ProtoCoder in CoderRegistry#getCoder
[ https://issues.apache.org/jira/browse/BEAM-2658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neville Li updated BEAM-2658: - Description: {code) public class CoderTest { public static void main(String[] args) throws CannotProvideCoderException { PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create(); Pipeline pipeline = Pipeline.create(options); Coder coder = pipeline.getCoderRegistry().getCoder(Timestamp.class); // class org.apache.beam.sdk.coders.SerializableCoder System.out.println(coder.getClass()); } } {code) was: {{ public class CoderTest { public static void main(String[] args) throws CannotProvideCoderException { PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create(); Pipeline pipeline = Pipeline.create(options); Coder coder = pipeline.getCoderRegistry().getCoder(Timestamp.class); // class org.apache.beam.sdk.coders.SerializableCoder System.out.println(coder.getClass()); } } }} > SerializableCoder has higher precedence over ProtoCoder in > CoderRegistry#getCoder > - > > Key: BEAM-2658 > URL: https://issues.apache.org/jira/browse/BEAM-2658 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Affects Versions: 2.0.0 >Reporter: Neville Li >Assignee: Davor Bonaci >Priority: Minor > > {code) > public class CoderTest { > public static void main(String[] args) throws CannotProvideCoderException { > PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create(); > Pipeline pipeline = Pipeline.create(options); > Coder coder = > pipeline.getCoderRegistry().getCoder(Timestamp.class); > // class org.apache.beam.sdk.coders.SerializableCoder > System.out.println(coder.getClass()); > } > } > {code) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (BEAM-2658) SerializableCoder has higher precedence over ProtoCoder in CoderRegistry#getCoder
[ https://issues.apache.org/jira/browse/BEAM-2658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neville Li updated BEAM-2658: - Description: {code} import com.google.protobuf.Timestamp; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; public class CoderTest { public static void main(String[] args) throws CannotProvideCoderException { PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create(); Pipeline pipeline = Pipeline.create(options); Coder coder = pipeline.getCoderRegistry().getCoder(Timestamp.class); // class org.apache.beam.sdk.coders.SerializableCoder System.out.println(coder.getClass()); } } {code} was: {code} public class CoderTest { public static void main(String[] args) throws CannotProvideCoderException { PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create(); Pipeline pipeline = Pipeline.create(options); Coder coder = pipeline.getCoderRegistry().getCoder(Timestamp.class); // class org.apache.beam.sdk.coders.SerializableCoder System.out.println(coder.getClass()); } } {code} > SerializableCoder has higher precedence over ProtoCoder in > CoderRegistry#getCoder > - > > Key: BEAM-2658 > URL: https://issues.apache.org/jira/browse/BEAM-2658 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Affects Versions: 2.0.0 >Reporter: Neville Li >Assignee: Davor Bonaci >Priority: Minor > > {code} > import com.google.protobuf.Timestamp; > import org.apache.beam.sdk.Pipeline; > import org.apache.beam.sdk.coders.CannotProvideCoderException; > import org.apache.beam.sdk.coders.Coder; > import org.apache.beam.sdk.options.PipelineOptions; > import org.apache.beam.sdk.options.PipelineOptionsFactory; > public class CoderTest { > public static void main(String[] args) throws CannotProvideCoderException { > PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create(); > Pipeline pipeline = Pipeline.create(options); > Coder coder = > pipeline.getCoderRegistry().getCoder(Timestamp.class); > // class org.apache.beam.sdk.coders.SerializableCoder > System.out.println(coder.getClass()); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (BEAM-2658) SerializableCoder has higher precedence over ProtoCoder in CoderRegistry#getCoder
[ https://issues.apache.org/jira/browse/BEAM-2658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neville Li updated BEAM-2658: - Description: {code} public class CoderTest { public static void main(String[] args) throws CannotProvideCoderException { PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create(); Pipeline pipeline = Pipeline.create(options); Coder coder = pipeline.getCoderRegistry().getCoder(Timestamp.class); // class org.apache.beam.sdk.coders.SerializableCoder System.out.println(coder.getClass()); } } {code} was: {code) public class CoderTest { public static void main(String[] args) throws CannotProvideCoderException { PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create(); Pipeline pipeline = Pipeline.create(options); Coder coder = pipeline.getCoderRegistry().getCoder(Timestamp.class); // class org.apache.beam.sdk.coders.SerializableCoder System.out.println(coder.getClass()); } } {code) > SerializableCoder has higher precedence over ProtoCoder in > CoderRegistry#getCoder > - > > Key: BEAM-2658 > URL: https://issues.apache.org/jira/browse/BEAM-2658 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Affects Versions: 2.0.0 >Reporter: Neville Li >Assignee: Davor Bonaci >Priority: Minor > > {code} > public class CoderTest { > public static void main(String[] args) throws CannotProvideCoderException { > PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create(); > Pipeline pipeline = Pipeline.create(options); > Coder coder = > pipeline.getCoderRegistry().getCoder(Timestamp.class); > // class org.apache.beam.sdk.coders.SerializableCoder > System.out.println(coder.getClass()); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (BEAM-2658) SerializableCoder has high precedence over ProtoCoder in CoderRegistry#getCoder
Neville Li created BEAM-2658: Summary: SerializableCoder has high precedence over ProtoCoder in CoderRegistry#getCoder Key: BEAM-2658 URL: https://issues.apache.org/jira/browse/BEAM-2658 Project: Beam Issue Type: Bug Components: sdk-java-core Affects Versions: 2.0.0 Reporter: Neville Li Assignee: Davor Bonaci Priority: Minor {{ public class CoderTest { public static void main(String[] args) throws CannotProvideCoderException { PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create(); Pipeline pipeline = Pipeline.create(options); Coder coder = pipeline.getCoderRegistry().getCoder(Timestamp.class); // class org.apache.beam.sdk.coders.SerializableCoder System.out.println(coder.getClass()); } } }} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (BEAM-2658) SerializableCoder has higher precedence over ProtoCoder in CoderRegistry#getCoder
[ https://issues.apache.org/jira/browse/BEAM-2658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neville Li updated BEAM-2658: - Summary: SerializableCoder has higher precedence over ProtoCoder in CoderRegistry#getCoder (was: SerializableCoder has high precedence over ProtoCoder in CoderRegistry#getCoder) > SerializableCoder has higher precedence over ProtoCoder in > CoderRegistry#getCoder > - > > Key: BEAM-2658 > URL: https://issues.apache.org/jira/browse/BEAM-2658 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Affects Versions: 2.0.0 >Reporter: Neville Li >Assignee: Davor Bonaci >Priority: Minor > > {{ > public class CoderTest { > public static void main(String[] args) throws CannotProvideCoderException { > PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create(); > Pipeline pipeline = Pipeline.create(options); > Coder coder = > pipeline.getCoderRegistry().getCoder(Timestamp.class); > // class org.apache.beam.sdk.coders.SerializableCoder > System.out.println(coder.getClass()); > } > } > }} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-2453) The Java DirectRunner should exercise all parts of a CombineFn
[ https://issues.apache.org/jira/browse/BEAM-2453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16093359#comment-16093359 ] Neville Li commented on BEAM-2453: -- Here's an example of incorrect use of {{Combine.perKey}} that could be identified by this fix: https://github.com/spotify/scio/issues/729 > The Java DirectRunner should exercise all parts of a CombineFn > -- > > Key: BEAM-2453 > URL: https://issues.apache.org/jira/browse/BEAM-2453 > Project: Beam > Issue Type: Bug > Components: runner-direct >Reporter: Thomas Groh >Assignee: Thomas Groh > > Specifically it should: > Create some number of accumulators; add elements to these accumulators, merge > the created accumulators, and extract the output. > This can be performed by replacing the {{Combine.perKey}} composite transform > with a multi-step combine {{CombineBundles -> GroupByKey -> > MergeAccumulators}} > Where {{CombineBundles}} is a {{ParDo}} which takes input {{KV}} > and produces {{KV}}, outputting in {{FinishBundle}} (this can only > be performed if the Combine takes no side inputs or does not have merging > windows). {{MergeAccumulators}} takes in {{KV>}} and > produces {{KV}} by merging all of the accumulators and extracting > the output. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-2532) BigQueryIO source should avoid expensive JSON schema parsing for every record
[ https://issues.apache.org/jira/browse/BEAM-2532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16090599#comment-16090599 ] Neville Li commented on BEAM-2532: -- Would love to see a fix in the next release. This is a big performance regression for us since we use BigQuery heavily. > BigQueryIO source should avoid expensive JSON schema parsing for every record > - > > Key: BEAM-2532 > URL: https://issues.apache.org/jira/browse/BEAM-2532 > Project: Beam > Issue Type: Improvement > Components: sdk-java-gcp >Affects Versions: 2.0.0 >Reporter: Marian Dvorsky >Assignee: Chamikara Jayalath >Priority: Minor > > BigQueryIO source converts the schema from JSON for every input row, here: > https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java#L159 > This is the performance bottleneck in a simple pipeline with BigQueryIO > source. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-302) Add Scio Scala DSL to Beam
[ https://issues.apache.org/jira/browse/BEAM-302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15996831#comment-15996831 ] Neville Li commented on BEAM-302: - Yes that ecosystem has too many build params, scala version, spark version, hadoop version, etc. 2.10 is outdated, quite different from 2.11/2.12 and hard to maintain. That's why we stopped supporting it. 2.12 support should be available soon once some compiler lambda serialization issues are addressed. > Add Scio Scala DSL to Beam > -- > > Key: BEAM-302 > URL: https://issues.apache.org/jira/browse/BEAM-302 > Project: Beam > Issue Type: Wish > Components: sdk-ideas >Reporter: Jean-Baptiste Onofré > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-302) Add Scio Scala DSL to Beam
[ https://issues.apache.org/jira/browse/BEAM-302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15995075#comment-15995075 ] Neville Li commented on BEAM-302: - Looks like Spark runner still depends on 1.6.3. Can you give Spark 1.6 a shot instead? https://mvnrepository.com/artifact/org.apache.beam/beam-runners-spark/0.6.0 We'd love to support all runners but we use Dataflow runner only and vanilla Spark. Contributions will be awesome and are definitely welcome. Feel free to submit issues or PRs on our GH repo. There's also a GItter room and a Google group for discussions. https://github.com/spotify/scio > Add Scio Scala DSL to Beam > -- > > Key: BEAM-302 > URL: https://issues.apache.org/jira/browse/BEAM-302 > Project: Beam > Issue Type: Wish > Components: sdk-ideas >Reporter: Jean-Baptiste Onofré > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-302) Add Scio Scala DSL to Beam
[ https://issues.apache.org/jira/browse/BEAM-302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15994005#comment-15994005 ] Neville Li commented on BEAM-302: - You need the spark runner dependency which is not included by default. > Add Scio Scala DSL to Beam > -- > > Key: BEAM-302 > URL: https://issues.apache.org/jira/browse/BEAM-302 > Project: Beam > Issue Type: Wish > Components: sdk-ideas >Reporter: Jean-Baptiste Onofré > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-302) Add Scio Scala DSL to Beam
[ https://issues.apache.org/jira/browse/BEAM-302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15953584#comment-15953584 ] Neville Li commented on BEAM-302: - We prefer to keep it separate for now mainly for logistics reasons: - we use SBT with lots of custom logic - we release very often, once every 1-2 weeks - we monkey patch Beam bugs, test in our production jobs, before upstream Beam release - we use a lightweight collaboration model, mainly just Github issues & PRs - there're only 3 Scio developers at Spotify supporting 150+ internal users and many external ones, all running on Dataflow However I also want to point out that nothing should stop those interested from trying it out or contributing - we decoupled Dataflow runner as much as possible - Scio should run on other runners without modification, just a matter of changing dependencies and arguments - there're still parts coupled with GCP and Dataflow runner but hopefully we can gradually decouple them as the file system and other related API improves - it'd be great to see bug reports and PRs from the community > Add Scio Scala DSL to Beam > -- > > Key: BEAM-302 > URL: https://issues.apache.org/jira/browse/BEAM-302 > Project: Beam > Issue Type: Wish > Components: sdk-ideas >Reporter: Jean-Baptiste Onofré > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (BEAM-302) Add Scio Scala DSL to Beam
[ https://issues.apache.org/jira/browse/BEAM-302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neville Li reassigned BEAM-302: --- Assignee: (was: Neville Li) > Add Scio Scala DSL to Beam > -- > > Key: BEAM-302 > URL: https://issues.apache.org/jira/browse/BEAM-302 > Project: Beam > Issue Type: Wish > Components: sdk-ideas >Reporter: Jean-Baptiste Onofré > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Closed] (BEAM-1518) Support deflate (zlib) in CompressedSource and FileBasedSink
[ https://issues.apache.org/jira/browse/BEAM-1518?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neville Li closed BEAM-1518. Resolution: Fixed > Support deflate (zlib) in CompressedSource and FileBasedSink > > > Key: BEAM-1518 > URL: https://issues.apache.org/jira/browse/BEAM-1518 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core >Affects Versions: 0.5.0 >Reporter: Neville Li >Assignee: Neville Li >Priority: Minor > Fix For: 0.6.0 > > > `.deflate` files are quite common in Hadoop and also supported by TensorFlow > in TFRecord file format. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (BEAM-1518) Support deflate (zlib) in CompressedSource and FileBasedSink
[ https://issues.apache.org/jira/browse/BEAM-1518?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neville Li updated BEAM-1518: - Fix Version/s: 0.6.0 > Support deflate (zlib) in CompressedSource and FileBasedSink > > > Key: BEAM-1518 > URL: https://issues.apache.org/jira/browse/BEAM-1518 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core >Affects Versions: 0.5.0 >Reporter: Neville Li >Assignee: Neville Li >Priority: Minor > Fix For: 0.6.0 > > > `.deflate` files are quite common in Hadoop and also supported by TensorFlow > in TFRecord file format. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (BEAM-1520) Implement TFRecordIO (Reading/writing Tensorflow Standard format)
Neville Li created BEAM-1520: Summary: Implement TFRecordIO (Reading/writing Tensorflow Standard format) Key: BEAM-1520 URL: https://issues.apache.org/jira/browse/BEAM-1520 Project: Beam Issue Type: Improvement Components: sdk-java-core Affects Versions: 0.5.0 Reporter: Neville Li Assignee: Davor Bonaci Priority: Minor -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (BEAM-1520) Implement TFRecordIO (Reading/writing Tensorflow Standard format)
[ https://issues.apache.org/jira/browse/BEAM-1520?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neville Li reassigned BEAM-1520: Assignee: Neville Li (was: Davor Bonaci) > Implement TFRecordIO (Reading/writing Tensorflow Standard format) > - > > Key: BEAM-1520 > URL: https://issues.apache.org/jira/browse/BEAM-1520 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core >Affects Versions: 0.5.0 >Reporter: Neville Li >Assignee: Neville Li >Priority: Minor > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (BEAM-1519) Support snappy in CompressedSource and FileBasedSink
[ https://issues.apache.org/jira/browse/BEAM-1519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neville Li reassigned BEAM-1519: Assignee: (was: Neville Li) > Support snappy in CompressedSource and FileBasedSink > > > Key: BEAM-1519 > URL: https://issues.apache.org/jira/browse/BEAM-1519 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core >Affects Versions: 0.5.0 >Reporter: Neville Li >Priority: Minor > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (BEAM-1518) Support deflate (zlib) in CompressedSource and FileBasedSink
[ https://issues.apache.org/jira/browse/BEAM-1518?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neville Li updated BEAM-1518: - Summary: Support deflate (zlib) in CompressedSource and FileBasedSink (was: Support ZLIB (deflate) in CompressedSource and FileBasedSink) > Support deflate (zlib) in CompressedSource and FileBasedSink > > > Key: BEAM-1518 > URL: https://issues.apache.org/jira/browse/BEAM-1518 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core >Affects Versions: 0.5.0 >Reporter: Neville Li >Assignee: Neville Li >Priority: Minor > > `.deflate` files are quite common in Hadoop and also supported by TensorFlow > in TFRecord file format. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (BEAM-1518) Support ZLIB (deflate) in CompressedSource and FileBasedSink
[ https://issues.apache.org/jira/browse/BEAM-1518?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neville Li updated BEAM-1518: - Description: `.deflate` files are quite common in Hadoop and also supported by TensorFlow in TFRecord file format. > Support ZLIB (deflate) in CompressedSource and FileBasedSink > > > Key: BEAM-1518 > URL: https://issues.apache.org/jira/browse/BEAM-1518 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core >Affects Versions: 0.5.0 >Reporter: Neville Li >Assignee: Neville Li >Priority: Minor > > `.deflate` files are quite common in Hadoop and also supported by TensorFlow > in TFRecord file format. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (BEAM-1519) CLONE - Support snappy in CompressedSource and FileBasedSink
Neville Li created BEAM-1519: Summary: CLONE - Support snappy in CompressedSource and FileBasedSink Key: BEAM-1519 URL: https://issues.apache.org/jira/browse/BEAM-1519 Project: Beam Issue Type: Improvement Components: sdk-java-core Affects Versions: 0.5.0 Reporter: Neville Li Assignee: Neville Li Priority: Minor -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (BEAM-1519) Support snappy in CompressedSource and FileBasedSink
[ https://issues.apache.org/jira/browse/BEAM-1519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neville Li updated BEAM-1519: - Summary: Support snappy in CompressedSource and FileBasedSink (was: CLONE - Support snappy in CompressedSource and FileBasedSink) > Support snappy in CompressedSource and FileBasedSink > > > Key: BEAM-1519 > URL: https://issues.apache.org/jira/browse/BEAM-1519 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core >Affects Versions: 0.5.0 >Reporter: Neville Li >Assignee: Neville Li >Priority: Minor > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (BEAM-1518) Support ZLIB (deflate) in CompressedSource and FileBasedSink
Neville Li created BEAM-1518: Summary: Support ZLIB (deflate) in CompressedSource and FileBasedSink Key: BEAM-1518 URL: https://issues.apache.org/jira/browse/BEAM-1518 Project: Beam Issue Type: Improvement Components: sdk-java-core Affects Versions: 0.5.0 Reporter: Neville Li Assignee: Neville Li Priority: Minor -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-298) Make TestPipeline implement the TestRule interface
[ https://issues.apache.org/jira/browse/BEAM-298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15864087#comment-15864087 ] Neville Li commented on BEAM-298: - Turns out I referenced {{TestPipeline}} in our `src/main` for some reason. I'll work around it locally. > Make TestPipeline implement the TestRule interface > -- > > Key: BEAM-298 > URL: https://issues.apache.org/jira/browse/BEAM-298 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core >Reporter: Thomas Groh >Assignee: Stas Levin >Priority: Minor > Fix For: 0.5.0 > > > https://github.com/junit-team/junit4/wiki/Rules > A JUnit Rule allows a test to use a field annotated with @Rule to wrap > executing tests. Doing so allows the TestPipeline to, at the time the test > completes, assert that all applied transforms have been executed. This > ensures that all unit tests that utilize a TestPipeline rule either are > declared to explicitly not expect to execute or have executed the pipeline. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (BEAM-298) Make TestPipeline implement the TestRule interface
[ https://issues.apache.org/jira/browse/BEAM-298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15862201#comment-15862201 ] Neville Li edited comment on BEAM-298 at 2/11/17 4:12 AM: -- That didn't work for me. I had to add it as a {{compile}} scope. was (Author: sinisa_lyh): That didn't work for me. I had to add it as a {compile} scope. > Make TestPipeline implement the TestRule interface > -- > > Key: BEAM-298 > URL: https://issues.apache.org/jira/browse/BEAM-298 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core >Reporter: Thomas Groh >Assignee: Stas Levin >Priority: Minor > Fix For: 0.5.0 > > > https://github.com/junit-team/junit4/wiki/Rules > A JUnit Rule allows a test to use a field annotated with @Rule to wrap > executing tests. Doing so allows the TestPipeline to, at the time the test > completes, assert that all applied transforms have been executed. This > ensures that all unit tests that utilize a TestPipeline rule either are > declared to explicitly not expect to execute or have executed the pipeline. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-298) Make TestPipeline implement the TestRule interface
[ https://issues.apache.org/jira/browse/BEAM-298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15862201#comment-15862201 ] Neville Li commented on BEAM-298: - That didn't work for me. I had to add it as a {compile} scope. > Make TestPipeline implement the TestRule interface > -- > > Key: BEAM-298 > URL: https://issues.apache.org/jira/browse/BEAM-298 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core >Reporter: Thomas Groh >Assignee: Stas Levin >Priority: Minor > Fix For: 0.5.0 > > > https://github.com/junit-team/junit4/wiki/Rules > A JUnit Rule allows a test to use a field annotated with @Rule to wrap > executing tests. Doing so allows the TestPipeline to, at the time the test > completes, assert that all applied transforms have been executed. This > ensures that all unit tests that utilize a TestPipeline rule either are > declared to explicitly not expect to execute or have executed the pipeline. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (BEAM-298) Make TestPipeline implement the TestRule interface
[ https://issues.apache.org/jira/browse/BEAM-298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15861795#comment-15861795 ] Neville Li edited comment on BEAM-298 at 2/10/17 8:08 PM: -- As a result of this change I need to include {{junit}} in my dependencies or it'll fail with a class not found exception. Is this expected? {code} java.lang.NoClassDefFoundError: org/junit/rules/TestRule at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:763) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) at java.net.URLClassLoader.access$100(URLClassLoader.java:73) at java.net.URLClassLoader$1.run(URLClassLoader.java:368) at java.net.URLClassLoader$1.run(URLClassLoader.java:362) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:361) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at com.spotify.scio.ContextAndArgs$.apply(ScioContext.scala:65) at com.spotify.scio.examples.MinimalWordCount$.main(MinimalWordCount.scala:35) at com.spotify.scio.examples.MinimalWordCount.main(MinimalWordCount.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at sbt.Run.invokeMain(Run.scala:67) at sbt.Run.run0(Run.scala:61) at sbt.Run.sbt$Run$$execute$1(Run.scala:51) at sbt.Run$$anonfun$run$1.apply$mcV$sp(Run.scala:55) at sbt.Run$$anonfun$run$1.apply(Run.scala:55) at sbt.Run$$anonfun$run$1.apply(Run.scala:55) at sbt.Logger$$anon$4.apply(Logger.scala:84) at sbt.TrapExit$App.run(TrapExit.scala:248) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassNotFoundException: org.junit.rules.TestRule at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) {code} was (Author: sinisa_lyh): As a result of this change I need to include {{junit}} in my dependencies or it'll fail with a class not found exception. Is this expected? {{ java.lang.NoClassDefFoundError: org/junit/rules/TestRule at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:763) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) at java.net.URLClassLoader.access$100(URLClassLoader.java:73) at java.net.URLClassLoader$1.run(URLClassLoader.java:368) at java.net.URLClassLoader$1.run(URLClassLoader.java:362) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:361) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at com.spotify.scio.ContextAndArgs$.apply(ScioContext.scala:65) at com.spotify.scio.examples.MinimalWordCount$.main(MinimalWordCount.scala:35) at com.spotify.scio.examples.MinimalWordCount.main(MinimalWordCount.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at sbt.Run.invokeMain(Run.scala:67) at sbt.Run.run0(Run.scala:61) at sbt.Run.sbt$Run$$execute$1(Run.scala:51) at sbt.Run$$anonfun$run$1.apply$mcV$sp(Run.scala:55) at sbt.Run$$anonfun$run$1.apply(Run.scala:55) at sbt.Run$$anonfun$run$1.apply(Run.scala:55) at sbt.Logger$$anon$4.apply(Logger.scala:84) at sbt.TrapExit$App.run(TrapExit.scala:248) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassNotFoundException: org.junit.rules.TestRule at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) }} > Make TestPipeline implement the TestRule interface > -- > > Key: BEAM-298 > URL: https:
[jira] [Commented] (BEAM-298) Make TestPipeline implement the TestRule interface
[ https://issues.apache.org/jira/browse/BEAM-298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15861795#comment-15861795 ] Neville Li commented on BEAM-298: - As a result of this change I need to include {{junit}} in my dependencies or it'll fail with a class not found exception. Is this expected? {{ java.lang.NoClassDefFoundError: org/junit/rules/TestRule at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:763) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) at java.net.URLClassLoader.access$100(URLClassLoader.java:73) at java.net.URLClassLoader$1.run(URLClassLoader.java:368) at java.net.URLClassLoader$1.run(URLClassLoader.java:362) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:361) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at com.spotify.scio.ContextAndArgs$.apply(ScioContext.scala:65) at com.spotify.scio.examples.MinimalWordCount$.main(MinimalWordCount.scala:35) at com.spotify.scio.examples.MinimalWordCount.main(MinimalWordCount.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at sbt.Run.invokeMain(Run.scala:67) at sbt.Run.run0(Run.scala:61) at sbt.Run.sbt$Run$$execute$1(Run.scala:51) at sbt.Run$$anonfun$run$1.apply$mcV$sp(Run.scala:55) at sbt.Run$$anonfun$run$1.apply(Run.scala:55) at sbt.Run$$anonfun$run$1.apply(Run.scala:55) at sbt.Logger$$anon$4.apply(Logger.scala:84) at sbt.TrapExit$App.run(TrapExit.scala:248) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassNotFoundException: org.junit.rules.TestRule at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) }} > Make TestPipeline implement the TestRule interface > -- > > Key: BEAM-298 > URL: https://issues.apache.org/jira/browse/BEAM-298 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core >Reporter: Thomas Groh >Assignee: Stas Levin >Priority: Minor > Fix For: 0.5.0 > > > https://github.com/junit-team/junit4/wiki/Rules > A JUnit Rule allows a test to use a field annotated with @Rule to wrap > executing tests. Doing so allows the TestPipeline to, at the time the test > completes, assert that all applied transforms have been executed. This > ensures that all unit tests that utilize a TestPipeline rule either are > declared to explicitly not expect to execute or have executed the pipeline. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-302) Add Scio Scala DSL to Beam
[ https://issues.apache.org/jira/browse/BEAM-302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15836553#comment-15836553 ] Neville Li commented on BEAM-302: - WIP branch here using 0.4.0 https://github.com/spotify/scio/tree/apache-beam Ticket https://github.com/spotify/scio/issues/279 > Add Scio Scala DSL to Beam > -- > > Key: BEAM-302 > URL: https://issues.apache.org/jira/browse/BEAM-302 > Project: Beam > Issue Type: Wish > Components: sdk-ideas >Reporter: Jean-Baptiste Onofré >Assignee: Neville Li > -- This message was sent by Atlassian JIRA (v6.3.4#6332)