[jira] [Commented] (BEAM-10068) Modify behavior of Dynamic Destinations
[ https://issues.apache.org/jira/browse/BEAM-10068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17114533#comment-17114533 ] Reuven Lax commented on BEAM-10068: --- It is reasonable to allow specifying a per-destination numShards. if this is what's wanted, please file a feature request. The number of destinations is controlled by the user, as is the windowing and triggering policies. > Modify behavior of Dynamic Destinations > --- > > Key: BEAM-10068 > URL: https://issues.apache.org/jira/browse/BEAM-10068 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core >Reporter: Mihir Borkar >Priority: P2 > > The writeDynamic() method, implementing Dynamic Destinations writes files per > destination per window per pane. > This leads to an increase in the number of files generated. > The request is as follows: > A way to make it possible for the user to modify the behavior of Dynamic > Destinations to control the number of output files being produced. > a.) We can consider adding user-configurable parameters like writers per > bundle, increasing number of records processed per bundle > and/or > b.) Introduce a method implementing Dynamic Destinations but more dependent > on the data passing through the pipeline, instead of windows/panes. > So instead of splitting every output file into roughly the number of > destinations being written to, we let the user configure how output files > should be divided across destinations. > Links: > [1] > [https://beam.apache.org/releases/javadoc/2.19.0/index.html?org/apache/beam/sdk/io/FileIO.html] > [2] > [https://github.com/apache/beam/blob/da9e17288e8473925674a4691d9e86252e67d7d7/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java] > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-10053) Timers exception on "Job Drain" while using stateful beam processing in global window
[ https://issues.apache.org/jira/browse/BEAM-10053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17113286#comment-17113286 ] Reuven Lax commented on BEAM-10053: --- Something appears off here: *org.apache.beam.sdk.transforms.windowing.GlobalWindow@29ca0210 that is before the appropriate cleanup time 294248-01-24T04:00:54.776Z* However the end of the GlobalWindow should be 9223371950454775, which is 294247-01-09T04:00:54+00:00 I don't see any allowed lateness set on the global window. Luke, do you have any idea why there is almost a 1 year difference between these timestamps? > Timers exception on "Job Drain" while using stateful beam processing in > global window > - > > Key: BEAM-10053 > URL: https://issues.apache.org/jira/browse/BEAM-10053 > Project: Beam > Issue Type: Bug > Components: runner-dataflow, sdk-java-core >Affects Versions: 2.19.0 >Reporter: MOHIL >Priority: P2 > > Hello, > > I have a use case where I have two sets of PCollections (RecordA and RecordB) > coming from a real time streaming source like Kafka. > > Both Records are correlated with a common key, let's say KEY. > > The purpose is to enrich RecordA with RecordB's data for which I am using > CoGbByKey. Since RecordA and RecordB for a common key can come within 1-2 > minutes of event time, I am maintaining a sliding window for both records and > then do CoGpByKey for both PCollections. > > The sliding windows that will find both RecordA and RecordB for a common key > KEY, will emit enriched output. Now, since multiple sliding windows can emit > the same output, I finally remove duplicate results by feeding aforementioned > outputs to a global window where I maintain a state to check whether output > has already been processed or not. Since it is a global window, I maintain a > Timer on state (for GC) to let it expire after 10 minutes have elapsed since > state has been written. > > This is working perfectly fine w.r.t the expected results. However, I am > unable to stop job gracefully i.e. Drain the job gracefully. I see following > exception: > > java.lang.IllegalStateException: > org.apache.beam.runners.dataflow.worker.SimpleParDoFn@16b089a6 received state > cleanup timer for window > org.apache.beam.sdk.transforms.windowing.GlobalWindow@29ca0210 that is before > the appropriate cleanup time 294248-01-24T04:00:54.776Z > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842) > org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processSystemTimer(SimpleParDoFn.java:384) > org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$700(SimpleParDoFn.java:73) > org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$2.processTimer(SimpleParDoFn.java:444) > org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:467) > org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:354) > org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:52) > org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85) > org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1316) > org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149) > org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1049) > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > java.lang.Thread.run(Thread.java:745) > java.lang.IllegalStateException: > org.apache.beam.runners.dataflow.worker.SimpleParDoFn@59902a10 received state > cleanup timer for window > org.apache.beam.sdk.transforms.windowing.GlobalWindow@29ca0210 that is before > the appropriate cleanup time 294248-01-24T04:00:54.776Z > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842) > org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processSystemTimer(SimpleParDoFn.java:384) > org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$700(SimpleParDoFn.java:73) > org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$2.processTimer(SimpleParDoFn.java:444) > org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:467) > org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:354) > org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:52) >
[jira] [Reopened] (BEAM-3489) Expose the message id of received messages within PubsubMessage
[ https://issues.apache.org/jira/browse/BEAM-3489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reuven Lax reopened BEAM-3489: -- This feature does not work correctly on Dataflow. Dataflow only returns a PubSubMessage if the PUBSUB_SERIALIZED_ATTRIBUTES_FN is set, otherwise it returns PubSubMessage.message > Expose the message id of received messages within PubsubMessage > --- > > Key: BEAM-3489 > URL: https://issues.apache.org/jira/browse/BEAM-3489 > Project: Beam > Issue Type: New Feature > Components: io-java-gcp >Reporter: Luke Cwik >Assignee: Thinh Ha >Priority: P3 > Labels: newbie, starter > Fix For: 2.16.0 > > Time Spent: 8h 20m > Remaining Estimate: 0h > > This task is about passing forward the message id from the pubsub proto to > the java PubsubMessage. > Add a message id field to PubsubMessage. > Update the coder for PubsubMessage to encode the message id. > Update the translation from the Pubsub proto message to the Dataflow message: > https://github.com/apache/beam/blob/2e275264b21db45787833502e5e42907b05e28b8/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L976 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (BEAM-10015) output timestamp not properly propagated through the Dataflow runner
[ https://issues.apache.org/jira/browse/BEAM-10015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reuven Lax resolved BEAM-10015. --- Fix Version/s: 2.21.0 Resolution: Fixed > output timestamp not properly propagated through the Dataflow runner > > > Key: BEAM-10015 > URL: https://issues.apache.org/jira/browse/BEAM-10015 > Project: Beam > Issue Type: Bug > Components: runner-dataflow >Reporter: Reuven Lax >Priority: Critical > Fix For: 2.21.0 > > Time Spent: 40m > Remaining Estimate: 0h > > Dataflow runner does not propagate the output timestamp into timer firing, > resulting in incorrect default timestamps when outputting from a processTimer. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-10015) output timestamp not properly propagated through the Dataflow runner
Reuven Lax created BEAM-10015: - Summary: output timestamp not properly propagated through the Dataflow runner Key: BEAM-10015 URL: https://issues.apache.org/jira/browse/BEAM-10015 Project: Beam Issue Type: Bug Components: runner-dataflow Reporter: Reuven Lax Dataflow runner does not propagate the output timestamp into timer firing, resulting in incorrect default timestamps when outputting from a processTimer. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-9814) Error occurred when transforming from row to a new row without setCoder
[ https://issues.apache.org/jira/browse/BEAM-9814?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17108644#comment-17108644 ] Reuven Lax commented on BEAM-9814: -- all these cases, you need to call setRowSchema on the PCollection. I plan on adding convenience transforms to make this easier, so you won't forget to make that call. > Error occurred when transforming from row to a new row without setCoder > --- > > Key: BEAM-9814 > URL: https://issues.apache.org/jira/browse/BEAM-9814 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Affects Versions: 2.19.0 >Reporter: Ruixue Liao >Assignee: Reuven Lax >Priority: Major > > The output row from transform function uses the input row schema to verify > which causes error. Ex: > {code} > .apply(MapElements.via( > new SimpleFunction() \{ > @Override > public Row apply(Row line) { > return Row.withSchema(newSchema).addValues("a", 1, > "b").build(); > } > })); > {code} > Got error when the output row schema is not the same as the input row. > Need to add {{.setCoder(RowCoder.of(newSchema))}} after the transform > function to make it work. > Related link: > [https://stackoverflow.com/questions/61236972/beam-sql-udf-to-split-one-column-into-multiple-columns] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-9863) AvroUtils is converting incorrectly LogicalType Timestamps from long into Joda DateTimes
[ https://issues.apache.org/jira/browse/BEAM-9863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17097534#comment-17097534 ] reuven lax commented on BEAM-9863: -- My thought too. It might be worth looking up the commit that originally added this change to see what the context was. > AvroUtils is converting incorrectly LogicalType Timestamps from long into > Joda DateTimes > > > Key: BEAM-9863 > URL: https://issues.apache.org/jira/browse/BEAM-9863 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Affects Versions: 2.15.0, 2.16.0, 2.17.0, 2.18.0, 2.19.0, 2.20.0, 2.21.0 >Reporter: Ismaël Mejía >Assignee: Reuven Lax >Priority: Major > > Copied from the mailing list report: > I think the method AvroUtils.toBeamSchema has a not expected side effect. > I found out that, if you invoke it and then you run a pipeline of > GenericRecords containing a timestamp (l tried with logical-type > timestamp-millis), Beam converts such timestamp from long to > org.joda.time.DateTime. Even if you don't apply any transformation to the > pipeline. > Do you think it's a bug? > More details on how to reproduce here: > https://lists.apache.org/thread.html/r43fb2896e496b7493a962207eb3b95360abc30b9d091b26f110264d0%40%3Cuser.beam.apache.org%3E -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-9759) Pipeline creation with large number of shards/streams takes long time
[ https://issues.apache.org/jira/browse/BEAM-9759?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17095092#comment-17095092 ] Reuven Lax commented on BEAM-9759: -- The best solution would be to create a new SplittableDoFn version of the Kinesis runner. This would have several advantages: # It could support dynamic changes (at run time) of the list of Kinesis topics. I believe this is a major reason that you currently need to update the pipeline so often, and this would remove that need. 2. The splitting could then happen at run time instead of graph-construction time, and could also be parallelized. > Pipeline creation with large number of shards/streams takes long time > - > > Key: BEAM-9759 > URL: https://issues.apache.org/jira/browse/BEAM-9759 > Project: Beam > Issue Type: Improvement > Components: io-java-kinesis, runner-dataflow >Affects Versions: 2.19.0 >Reporter: Sebastian Graca >Priority: Major > > We are processing multiple Kinesis streams using pipelines running on > {{DataflowRunner}}. The time needed to start such pipeline from a pipeline > definition (execution of {{org.apache.beam.sdk.Pipeline.run()}} method) takes > considerable amount of time. In our case: > * a pipeline that consumes data from 196 streams (237 shards in total) > starts in 7 minutes > * a pipeline that consumes data from 111 streams (261 shards in total) > starts in 4 minutes > I've been investigating this and found out that when {{Pipeline.run}} is > invoked, the whole pipeline graph is traversed and serialized so it can be > passed to the Dataflow backend. Here's part of the stacktrace that shows this > traversal: > {code:java} > at > com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:1252) > at > org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.lambda$getRecords$2(SimplifiedKinesisClient.java:137) > at > org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.wrapExceptions(SimplifiedKinesisClient.java:210) > at > org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.getRecords(SimplifiedKinesisClient.java:134) > at > org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.getRecords(SimplifiedKinesisClient.java:119) > at > org.apache.beam.sdk.io.kinesis.StartingPointShardsFinder.validateShards(StartingPointShardsFinder.java:195) > at > org.apache.beam.sdk.io.kinesis.StartingPointShardsFinder.findShardsAtStartingPoint(StartingPointShardsFinder.java:115) > at > org.apache.beam.sdk.io.kinesis.DynamicCheckpointGenerator.generate(DynamicCheckpointGenerator.java:59) > at org.apache.beam.sdk.io.kinesis.KinesisSource.split(KinesisSource.java:88) > at > org.apache.beam.runners.dataflow.internal.CustomSources.serializeToCloudSource(CustomSources.java:87) > at > org.apache.beam.runners.dataflow.ReadTranslator.translateReadHelper(ReadTranslator.java:51) > at > org.apache.beam.runners.dataflow.DataflowRunner$StreamingUnboundedRead$ReadWithIdsTranslator.translate(DataflowRunner.java:1630) > at > org.apache.beam.runners.dataflow.DataflowRunner$StreamingUnboundedRead$ReadWithIdsTranslator.translate(DataflowRunner.java:1627) > at > org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.visitPrimitiveTransform(DataflowPipelineTranslator.java:494) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:665) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317) > at > org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251) > at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:460) > at > org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.translate(DataflowPipelineTranslator.java:433) > at > org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translate(DataflowPipelineTranslator.java:192) > at > org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:795) > at > org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:186) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:315) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:301) > {code} > As you can see, during serialization, > {{org.apache.beam.sdk.io.kinesis.KinesisSource.split}} method is called. This > method finds all shards for the stream and also validates each shard by > reading from it. As this process is sequential it takes considerable time > that is dependent both on the number of streams
[jira] [Commented] (BEAM-6566) SqlTransform does not work for beam version above 2.6.0 if RowCoder explicitly chosen
[ https://issues.apache.org/jira/browse/BEAM-6566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17094666#comment-17094666 ] Reuven Lax commented on BEAM-6566: -- We've redefined RowCoder to be SchemaCoder, so this should work. However it's preferable to instead use PCollection.withRowSchema(schema) > SqlTransform does not work for beam version above 2.6.0 if RowCoder > explicitly chosen > - > > Key: BEAM-6566 > URL: https://issues.apache.org/jira/browse/BEAM-6566 > Project: Beam > Issue Type: Bug > Components: dsl-sql >Affects Versions: 2.7.0, 2.8.0, 2.9.0, 2.10.0 >Reporter: Xuefeng Zhang >Assignee: Reuven Lax >Priority: Critical > Time Spent: 1h > Remaining Estimate: 0h > > *Issue*: > Beam versions above 2.6.0 do not work for SqlTransform. By looking at the > code, those versions use PCollection.getSchema, this function never works > even for 2.6.0 > *Details:* > Beam 2.6.0, class BeamPcollectionTable which is used by SqlTransform: > public BeamPCollectionTable(PCollection upstream) \{ super(((RowCoder) > upstream.getCoder()).getSchema()); this.upstream = upstream; } > But for Beam 2.7.0 and 2.8.0, it is changed to : > https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamPCollectionTable.java > So that got below errors after upgrading beam version from 2.6.0 to 2.9.0 > java.lang.IllegalStateException: Cannot call getSchema when there is no schema > at org.apache.beam.sdk.values.PCollection.getSchema(PCollection.java:328) > at > org.apache.beam.sdk.extensions.sql.impl.schema.BeamPCollectionTable.(BeamPCollectionTable.java:34) > at > org.apache.beam.sdk.extensions.sql.SqlTransform.toTableMap(SqlTransform.java:111) > at > org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:91) > at > org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:76) > at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537) > at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471) > at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:357) > *Codes:* > Schema schema = Schema.builder() > .addStringField("weightMarketValue") > .addStringField("ticker") > .addStringField("ratingLongTermFitchRaw") > .build(); > Row row = Row.withSchema(schema) > .addValues("weightMarketValue 1", "ticker 1", "ratingLongTermFitchRaw 1") > .build(); > Version 1: > > {color:#FF}PCollection input = p.apply(Create.of(row){color} > {color:#FF} .withSchema(schema, SerializableFunctions.identity(), > SerializableFunctions.identity()){color} > {color:#FF} .withCoder(RowCoder.of(schema)));{color} > PCollection output = input.apply(SqlTransform.query("select * from > PCOLLECTION")); > Version 2: > {color:#FF}PCollection input = p.apply(Create.of(row){color} > {color:#FF}.withRowSchema(schema){color} > {color:#FF}.withCoder(RowCoder.of(schema)));{color} > PCollection output = input.apply(SqlTransform.query("select * from > PCOLLECTION")); > > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-9613) BigQuery IO not support convert double type for beam row
[ https://issues.apache.org/jira/browse/BEAM-9613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17091742#comment-17091742 ] Reuven Lax commented on BEAM-9613: -- JSON is a textual format. The fact that that the Java JSON object sometimes stores object of Java type Long, is simply a performance optimization. In order to use this value (e.g. to write it back to Bigquery), the Long is being converted back to a string anyway. By the same token we could store ByteArray objects in the TableRow, but this would have to be base64 encoded into a string before actually being sent back to BigQuery. I'm not sure who wrote the code to parse TableRows into schema row objects. However if we expect to get Long, Double,etc. objects in the TableRow, then this mapping code needs to handle those objects. Handling them directly would be more efficient - converting to a String would simply be a stopgap "one-line" fix. > BigQuery IO not support convert double type for beam row > > > Key: BEAM-9613 > URL: https://issues.apache.org/jira/browse/BEAM-9613 > Project: Beam > Issue Type: Bug > Components: io-java-gcp >Reporter: MAKSIM TSYGAN >Priority: Major > > If execute query with double column via BigQueryIO.readFrom(), I get > exception: > Caused by: java.lang.UnsupportedOperationException: Converting BigQuery type > 'class java.lang.Double' to 'FieldType\{typeName=DOUBLE, nullable=true, > logicalType=null, collectionElementType=null, mapKeyType=null, > mapValueType=null, rowSchema=null, metadata={}}' is not supported > at > org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.toBeamValue(BigQueryUtils.java:532) > at > org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.toBeamRowFieldValue(BigQueryUtils.java:483) > at > org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.lambda$toBeamRow$6(BigQueryUtils.java:469) > at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) > at > java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) > at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) > at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) > at > org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.toBeamRow(BigQueryUtils.java:470) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-9696) UnionCoder IndexOutOfBoundsException in schema-driven join transform
[ https://issues.apache.org/jira/browse/BEAM-9696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17091735#comment-17091735 ] Reuven Lax commented on BEAM-9696: -- Is there an easy repro to run? This failure is happening deep inside CoGroupByKey. CGBK seems to have claimed that an element is from the third tag input, however there are only two inputs. The input index is encoded in the the CGBK element using CGBK's union coder. Looking at the code in CoGroupByKey.java this appears to be impossible, though it's possible that there's some weird latent bug in CoGroupByKey that we have triggered. Would need a repro to debug. > UnionCoder IndexOutOfBoundsException in schema-driven join transform > > > Key: BEAM-9696 > URL: https://issues.apache.org/jira/browse/BEAM-9696 > Project: Beam > Issue Type: Bug > Components: dsl-sql-zetasql, sdk-java-core >Reporter: Andrew Pilloud >Assignee: Reuven Lax >Priority: Minor > Labels: zetasql-compliance > > one failure in shard 17 > {code} > org.apache.beam.sdk.Pipeline$PipelineExecutionException: > java.lang.IndexOutOfBoundsException: Index: 2, Size: 2 > at > org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:348) > at > org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:318) > at > org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:213) > at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303) > at > org.apache.beam.sdk.extensions.sql.impl.rel.BeamEnumerableConverter.runCollector(BeamEnumerableConverter.java:201) > at > org.apache.beam.sdk.extensions.sql.impl.rel.BeamEnumerableConverter.collectRows(BeamEnumerableConverter.java:218) > at > org.apache.beam.sdk.extensions.sql.impl.rel.BeamEnumerableConverter.toRowList(BeamEnumerableConverter.java:150) > at > org.apache.beam.sdk.extensions.sql.impl.rel.BeamEnumerableConverter.toRowList(BeamEnumerableConverter.java:127) > at > cloud.dataflow.sql.ExecuteQueryServiceServer$SqlComplianceServiceImpl.executeQuery(ExecuteQueryServiceServer.java:329) > at > com.google.zetasql.testing.SqlComplianceServiceGrpc$MethodHandlers.invoke(SqlComplianceServiceGrpc.java:423) > at > com.google.zetasql.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:171) > at > com.google.zetasql.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:283) > at > com.google.zetasql.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:711) > at > com.google.zetasql.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) > at > com.google.zetasql.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.IndexOutOfBoundsException: Index: 2, Size: 2 > at java.util.ArrayList.rangeCheck(ArrayList.java:658) > at java.util.ArrayList.get(ArrayList.java:434) > at > org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:83) > at > org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:32) > at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82) > at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36) > at > org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:115) > at > org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:98) > at > org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:92) > at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:141) > at > org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.(MutationDetectors.java:115) > at > org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:46) > at > org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:112) > at > org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:299) > at > org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:259) > at >
[jira] [Commented] (BEAM-9814) Error occurred when transforming from row to a new row without setCoder
[ https://issues.apache.org/jira/browse/BEAM-9814?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17091725#comment-17091725 ] Reuven Lax commented on BEAM-9814: -- But we should add. a schema-aware version of MapElements. Something like: MapElements.into(outputSchema).via(row -> processRow(row) ); > Error occurred when transforming from row to a new row without setCoder > --- > > Key: BEAM-9814 > URL: https://issues.apache.org/jira/browse/BEAM-9814 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Affects Versions: 2.19.0 >Reporter: Ruixue Liao >Assignee: Reuven Lax >Priority: Major > > The output row from transform function uses the input row schema to verify > which causes error. Ex: > {code} > .apply(MapElements.via( > new SimpleFunction() \{ > @Override > public Row apply(Row line) { > return Row.withSchema(newSchema).addValues("a", 1, > "b").build(); > } > })); > {code} > Got error when the output row schema is not the same as the input row. > Need to add {{.setCoder(RowCoder.of(newSchema))}} after the transform > function to make it work. > Related link: > [https://stackoverflow.com/questions/61236972/beam-sql-udf-to-split-one-column-into-multiple-columns] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-9814) Error occurred when transforming from row to a new row without setCoder
[ https://issues.apache.org/jira/browse/BEAM-9814?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17091721#comment-17091721 ] Reuven Lax commented on BEAM-9814: -- Better to use setRowSchema(newSchema). You should't manipulate RowCoder yourself most of the time. > Error occurred when transforming from row to a new row without setCoder > --- > > Key: BEAM-9814 > URL: https://issues.apache.org/jira/browse/BEAM-9814 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Affects Versions: 2.19.0 >Reporter: Ruixue Liao >Assignee: Reuven Lax >Priority: Major > > The output row from transform function uses the input row schema to verify > which causes error. Ex: > {code} > .apply(MapElements.via( > new SimpleFunction() \{ > @Override > public Row apply(Row line) { > return Row.withSchema(newSchema).addValues("a", 1, > "b").build(); > } > })); > {code} > Got error when the output row schema is not the same as the input row. > Need to add {{.setCoder(RowCoder.of(newSchema))}} after the transform > function to make it work. > Related link: > [https://stackoverflow.com/questions/61236972/beam-sql-udf-to-split-one-column-into-multiple-columns] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-9613) BigQuery IO not support convert double type for beam row
[ https://issues.apache.org/jira/browse/BEAM-9613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17091720#comment-17091720 ] Reuven Lax commented on BEAM-9613: -- A simple fix for the bug experienced here would be for the conversion function to call toString on the object before processing it. > BigQuery IO not support convert double type for beam row > > > Key: BEAM-9613 > URL: https://issues.apache.org/jira/browse/BEAM-9613 > Project: Beam > Issue Type: Bug > Components: io-java-gcp >Reporter: MAKSIM TSYGAN >Priority: Major > > If execute query with double column via BigQueryIO.readFrom(), I get > exception: > Caused by: java.lang.UnsupportedOperationException: Converting BigQuery type > 'class java.lang.Double' to 'FieldType\{typeName=DOUBLE, nullable=true, > logicalType=null, collectionElementType=null, mapKeyType=null, > mapValueType=null, rowSchema=null, metadata={}}' is not supported > at > org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.toBeamValue(BigQueryUtils.java:532) > at > org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.toBeamRowFieldValue(BigQueryUtils.java:483) > at > org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.lambda$toBeamRow$6(BigQueryUtils.java:469) > at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) > at > java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) > at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) > at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) > at > org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.toBeamRow(BigQueryUtils.java:470) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-9613) BigQuery IO not support convert double type for beam row
[ https://issues.apache.org/jira/browse/BEAM-9613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17091719#comment-17091719 ] Reuven Lax commented on BEAM-9613: -- TableRow is a json object, so fundamentally its values are interpreted as strings. It looks like Beam does BQ exports in Avro files, and then converts the Avro to TableRow objects. If you look at BigQueryUtils.convertAvroPrimitiveType, it converts the values into primitive objects (Byte, Long, Double, etc.), not into String objects. I'm assuming this was done to save on memory usage in the worker. > BigQuery IO not support convert double type for beam row > > > Key: BEAM-9613 > URL: https://issues.apache.org/jira/browse/BEAM-9613 > Project: Beam > Issue Type: Bug > Components: io-java-gcp >Reporter: MAKSIM TSYGAN >Priority: Major > > If execute query with double column via BigQueryIO.readFrom(), I get > exception: > Caused by: java.lang.UnsupportedOperationException: Converting BigQuery type > 'class java.lang.Double' to 'FieldType\{typeName=DOUBLE, nullable=true, > logicalType=null, collectionElementType=null, mapKeyType=null, > mapValueType=null, rowSchema=null, metadata={}}' is not supported > at > org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.toBeamValue(BigQueryUtils.java:532) > at > org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.toBeamRowFieldValue(BigQueryUtils.java:483) > at > org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.lambda$toBeamRow$6(BigQueryUtils.java:469) > at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) > at > java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) > at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) > at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) > at > org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.toBeamRow(BigQueryUtils.java:470) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-9613) BigQuery IO not support convert double type for beam row
[ https://issues.apache.org/jira/browse/BEAM-9613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17091120#comment-17091120 ] Reuven Lax commented on BEAM-9613: -- No, looks to be a limitation of BigQueryUtils. It currently assumes that primitive TableRow fields are always of type String and tries to parse the string (in this case with Double.valueOf). However, here the field has type Double. > BigQuery IO not support convert double type for beam row > > > Key: BEAM-9613 > URL: https://issues.apache.org/jira/browse/BEAM-9613 > Project: Beam > Issue Type: Bug > Components: io-java-gcp, sdk-java-core >Reporter: MAKSIM TSYGAN >Priority: Major > > If execute query with double column via BigQueryIO.readFrom(), I get > exception: > Caused by: java.lang.UnsupportedOperationException: Converting BigQuery type > 'class java.lang.Double' to 'FieldType\{typeName=DOUBLE, nullable=true, > logicalType=null, collectionElementType=null, mapKeyType=null, > mapValueType=null, rowSchema=null, metadata={}}' is not supported > at > org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.toBeamValue(BigQueryUtils.java:532) > at > org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.toBeamRowFieldValue(BigQueryUtils.java:483) > at > org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.lambda$toBeamRow$6(BigQueryUtils.java:469) > at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) > at > java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) > at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) > at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) > at > org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.toBeamRow(BigQueryUtils.java:470) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-9370) ParDoSchemaTests fail when run with Java 11 on Flink 1.10
[ https://issues.apache.org/jira/browse/BEAM-9370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17088839#comment-17088839 ] Reuven Lax commented on BEAM-9370: -- Sent pr/11480 > ParDoSchemaTests fail when run with Java 11 on Flink 1.10 > - > > Key: BEAM-9370 > URL: https://issues.apache.org/jira/browse/BEAM-9370 > Project: Beam > Issue Type: Bug > Components: runner-flink, sdk-java-core, testing >Reporter: Ismaël Mejía >Priority: Minor > > When running the ValidatesRunner suite with Flink 1.10 (which supports > natively Java 11) it fails. For more details > https://scans.gradle.com/s/hkselnsuaartu/tests/failed > {quote}:runners:flink:1.10:validatesRunnerBatch » > org.apache.beam.sdk.transforms.ParDoSchemaTest » > testSchemaFieldSelectionUnboxing (0.309s) > java.lang.IllegalStateException: size = 2 > org.apache.beam.sdk.Pipeline$PipelineExecutionException > java.lang.IllegalStateException: size = 2 > at > org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.matcher.FilterableList$AbstractBase.getOnly(FilterableList.java:139) > at > org.apache.beam.sdk.schemas.utils.ByteBuddyUtils$ConvertValueForSetter.convertArray(ByteBuddyUtils.java:999) > at > org.apache.beam.sdk.schemas.utils.ByteBuddyUtils$ConvertValueForSetter.convertArray(ByteBuddyUtils.java:945) > at > org.apache.beam.sdk.schemas.utils.ByteBuddyUtils$TypeConversion.convert(ByteBuddyUtils.java:254) > at > org.apache.beam.sdk.schemas.utils.ConvertHelpers$ConvertPrimitiveInstruction.lambda$appender$0(ConvertHelpers.java:206) > at > org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.dynamic.scaffold.TypeWriter$MethodPool$Record$ForDefinedMethod$WithBody.applyCode(TypeWriter.java:713) > at > org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.dynamic.scaffold.TypeWriter$MethodPool$Record$ForDefinedMethod$WithBody.applyBody(TypeWriter.java:698) > at > org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.dynamic.scaffold.TypeWriter$MethodPool$Record$ForDefinedMethod.apply(TypeWriter.java:605) > at > org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.dynamic.scaffold.TypeWriter$MethodPool$Record$AccessBridgeWrapper.apply(TypeWriter.java:1271) > at > org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.dynamic.scaffold.TypeWriter$Default$ForCreation.create(TypeWriter.java:5133) > at > org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.dynamic.scaffold.TypeWriter$Default.make(TypeWriter.java:1933) > at > org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.dynamic.scaffold.subclass.SubclassDynamicTypeBuilder.make(SubclassDynamicTypeBuilder.java:225) > at > org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.dynamic.scaffold.subclass.SubclassDynamicTypeBuilder.make(SubclassDynamicTypeBuilder.java:198) > at > org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.dynamic.DynamicType$Builder$AbstractBase.make(DynamicType.java:3411) > at > org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.dynamic.DynamicType$Builder$AbstractBase$Delegator.make(DynamicType.java:3607) > at > org.apache.beam.sdk.schemas.utils.ConvertHelpers.getConvertPrimitive(ConvertHelpers.java:169) > at > org.apache.beam.sdk.transforms.DoFnSchemaInformation$UnboxingConversionFunction.getConversionFunction(DoFnSchemaInformation.java:242) > at > org.apache.beam.sdk.transforms.DoFnSchemaInformation$UnboxingConversionFunction.apply(DoFnSchemaInformation.java:235) > at > org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.schemaElement(SimpleDoFnRunner.java:487) > at > org.apache.beam.sdk.transforms.ParDoSchemaTest$15$DoFnInvoker.invokeProcessElement(Unknown > Source) > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-9370) ParDoSchemaTests fail when run with Java 11 on Flink 1.10
[ https://issues.apache.org/jira/browse/BEAM-9370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17088809#comment-17088809 ] Reuven Lax commented on BEAM-9370: -- Yes, the problem is because this code COLLECTION_TYPE .getDeclaredMethods().filter(ElementMatchers.named("toArray").and(ElementMatchers.takesArguments(1))).getOnly() Looks up a Collections.toArray function that takes a single argument. In Java 11 a new overload has been added that takes a single argument ( toArray([IntFunction|https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/function/IntFunction.html] generator)), so the filter returns two methods and getOnly is failing. I think this wasn't caught before as we don't really have many tests that run on Java 11. Fixing this should be simple. We should replace the above with COLLECTION_TYPE .getDeclaredMethods().filter(ElementMatchers.named("toArray").and(ElementMatchers.takesArguments(arrayType))).getOnly() This will look up the specific override instead of assuming that there is only one override that takes a single argument > ParDoSchemaTests fail when run with Java 11 on Flink 1.10 > - > > Key: BEAM-9370 > URL: https://issues.apache.org/jira/browse/BEAM-9370 > Project: Beam > Issue Type: Bug > Components: runner-flink, sdk-java-core, testing >Reporter: Ismaël Mejía >Priority: Minor > > When running the ValidatesRunner suite with Flink 1.10 (which supports > natively Java 11) it fails. For more details > https://scans.gradle.com/s/hkselnsuaartu/tests/failed > {quote}:runners:flink:1.10:validatesRunnerBatch » > org.apache.beam.sdk.transforms.ParDoSchemaTest » > testSchemaFieldSelectionUnboxing (0.309s) > java.lang.IllegalStateException: size = 2 > org.apache.beam.sdk.Pipeline$PipelineExecutionException > java.lang.IllegalStateException: size = 2 > at > org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.matcher.FilterableList$AbstractBase.getOnly(FilterableList.java:139) > at > org.apache.beam.sdk.schemas.utils.ByteBuddyUtils$ConvertValueForSetter.convertArray(ByteBuddyUtils.java:999) > at > org.apache.beam.sdk.schemas.utils.ByteBuddyUtils$ConvertValueForSetter.convertArray(ByteBuddyUtils.java:945) > at > org.apache.beam.sdk.schemas.utils.ByteBuddyUtils$TypeConversion.convert(ByteBuddyUtils.java:254) > at > org.apache.beam.sdk.schemas.utils.ConvertHelpers$ConvertPrimitiveInstruction.lambda$appender$0(ConvertHelpers.java:206) > at > org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.dynamic.scaffold.TypeWriter$MethodPool$Record$ForDefinedMethod$WithBody.applyCode(TypeWriter.java:713) > at > org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.dynamic.scaffold.TypeWriter$MethodPool$Record$ForDefinedMethod$WithBody.applyBody(TypeWriter.java:698) > at > org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.dynamic.scaffold.TypeWriter$MethodPool$Record$ForDefinedMethod.apply(TypeWriter.java:605) > at > org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.dynamic.scaffold.TypeWriter$MethodPool$Record$AccessBridgeWrapper.apply(TypeWriter.java:1271) > at > org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.dynamic.scaffold.TypeWriter$Default$ForCreation.create(TypeWriter.java:5133) > at > org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.dynamic.scaffold.TypeWriter$Default.make(TypeWriter.java:1933) > at > org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.dynamic.scaffold.subclass.SubclassDynamicTypeBuilder.make(SubclassDynamicTypeBuilder.java:225) > at > org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.dynamic.scaffold.subclass.SubclassDynamicTypeBuilder.make(SubclassDynamicTypeBuilder.java:198) > at > org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.dynamic.DynamicType$Builder$AbstractBase.make(DynamicType.java:3411) > at > org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.dynamic.DynamicType$Builder$AbstractBase$Delegator.make(DynamicType.java:3607) > at > org.apache.beam.sdk.schemas.utils.ConvertHelpers.getConvertPrimitive(ConvertHelpers.java:169) > at > org.apache.beam.sdk.transforms.DoFnSchemaInformation$UnboxingConversionFunction.getConversionFunction(DoFnSchemaInformation.java:242) > at > org.apache.beam.sdk.transforms.DoFnSchemaInformation$UnboxingConversionFunction.apply(DoFnSchemaInformation.java:235) > at > org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.schemaElement(SimpleDoFnRunner.java:487) > at > org.apache.beam.sdk.transforms.ParDoSchemaTest$15$DoFnInvoker.invokeProcessElement(Unknown > Source) > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-9569) Coder inference should be disabled for Row types
[ https://issues.apache.org/jira/browse/BEAM-9569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17086155#comment-17086155 ] Reuven Lax commented on BEAM-9569: -- This failure is not related to POJOs The actual problem is that MapElements (in the logRecords function) isn't setting an output schema, and we don't automatically propagate the input schema to the output in this case. For some reason Beam insists on having a Coder, even though the PCollection generated by logRecords is not used. In the past Beam "found" SerializableCoder as an alternative. SerializableCoder is not really a valid coder for Row, but since nobody ever consumed this PCollection, it didn't matter. A simple fix here would be to change the MapElements output type to be Void. Even better - there is no reason for logRecords to return the input PCollection in this case, so probably this test should've instead had a logging ParDo that produced no output. > Coder inference should be disabled for Row types > > > Key: BEAM-9569 > URL: https://issues.apache.org/jira/browse/BEAM-9569 > Project: Beam > Issue Type: Sub-task > Components: sdk-java-core >Reporter: Reuven Lax >Assignee: Reuven Lax >Priority: Major > Fix For: 2.21.0 > > Time Spent: 40m > Remaining Estimate: 0h > > Coder inference will pick up SerializableCoder, which is incorrect. We should > always be using schemas. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-9670) CoGroup transform should allow widening nullability in key schemas
Reuven Lax created BEAM-9670: Summary: CoGroup transform should allow widening nullability in key schemas Key: BEAM-9670 URL: https://issues.apache.org/jira/browse/BEAM-9670 Project: Beam Issue Type: Bug Components: sdk-java-core Reporter: Reuven Lax If the key field for one input has schema INT32 and the key field for the second input has schema NULLABLE[INT32], CoGroup fails - complaining that the key fields don't match. Instead CoGroup should widen the nullability restriction, creating a schema that allows either key input. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-9557) Error setting processing time timers near end-of-window
[ https://issues.apache.org/jira/browse/BEAM-9557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17067014#comment-17067014 ] Reuven Lax commented on BEAM-9557: -- pr/11226 sent for review > Error setting processing time timers near end-of-window > --- > > Key: BEAM-9557 > URL: https://issues.apache.org/jira/browse/BEAM-9557 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Steve Niemitz >Assignee: Reuven Lax >Priority: Critical > Fix For: 2.20.0 > > Time Spent: 10m > Remaining Estimate: 0h > > Previously, it was possible to set a processing time timer past the end of a > window, and it would simply not fire. > However, now, this results in an error: > {code:java} > java.lang.IllegalArgumentException: Attempted to set event time timer that > outputs for 2020-03-19T18:01:35.000Z but that is after the expiration of > window 2020-03-19T17:59:59.999Z > > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440) > > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setAndVerifyOutputTimestamp(SimpleDoFnRunner.java:1011) > > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setRelative(SimpleDoFnRunner.java:934) > .processElement(???.scala:187) > {code} > > I think the regression was introduced in commit > a005fd765a762183ca88df90f261f6d4a20cf3e0. Also notice that the error message > is wrong, it says that "event time timer" but the timer is in the processing > time domain. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-9557) Error setting processing time timers near end-of-window
[ https://issues.apache.org/jira/browse/BEAM-9557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17066892#comment-17066892 ] Reuven Lax commented on BEAM-9557: -- ok, this seems simple. I'll send out a PR. > Error setting processing time timers near end-of-window > --- > > Key: BEAM-9557 > URL: https://issues.apache.org/jira/browse/BEAM-9557 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Steve Niemitz >Assignee: Reuven Lax >Priority: Critical > Fix For: 2.20.0 > > > Previously, it was possible to set a processing time timer past the end of a > window, and it would simply not fire. > However, now, this results in an error: > {code:java} > java.lang.IllegalArgumentException: Attempted to set event time timer that > outputs for 2020-03-19T18:01:35.000Z but that is after the expiration of > window 2020-03-19T17:59:59.999Z > > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440) > > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setAndVerifyOutputTimestamp(SimpleDoFnRunner.java:1011) > > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setRelative(SimpleDoFnRunner.java:934) > .processElement(???.scala:187) > {code} > > I think the regression was introduced in commit > a005fd765a762183ca88df90f261f6d4a20cf3e0. Also notice that the error message > is wrong, it says that "event time timer" but the timer is in the processing > time domain. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-9557) Error setting processing time timers near end-of-window
[ https://issues.apache.org/jira/browse/BEAM-9557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17066364#comment-17066364 ] Reuven Lax commented on BEAM-9557: -- Ah yes, I was referring to processing-time timers. For event-time timers, I believe that we need to maintain the invariant that outputTimestamp <= timerTimestamp < window_end + allowed_lateness > Error setting processing time timers near end-of-window > --- > > Key: BEAM-9557 > URL: https://issues.apache.org/jira/browse/BEAM-9557 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Steve Niemitz >Assignee: Reuven Lax >Priority: Critical > Fix For: 2.20.0 > > > Previously, it was possible to set a processing time timer past the end of a > window, and it would simply not fire. > However, now, this results in an error: > {code:java} > java.lang.IllegalArgumentException: Attempted to set event time timer that > outputs for 2020-03-19T18:01:35.000Z but that is after the expiration of > window 2020-03-19T17:59:59.999Z > > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440) > > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setAndVerifyOutputTimestamp(SimpleDoFnRunner.java:1011) > > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setRelative(SimpleDoFnRunner.java:934) > .processElement(???.scala:187) > {code} > > I think the regression was introduced in commit > a005fd765a762183ca88df90f261f6d4a20cf3e0. Also notice that the error message > is wrong, it says that "event time timer" but the timer is in the processing > time domain. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-9557) Error setting processing time timers near end-of-window
[ https://issues.apache.org/jira/browse/BEAM-9557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17066023#comment-17066023 ] Reuven Lax commented on BEAM-9557: -- I'm not sure why the simple solution I proposed does not work. For event-time timer, the default hold will stay exactly where it was. For processing-time timers, I think the commit referenced contained a typo, and that typo caused the bug; the default hold should always be inside the current window. > Error setting processing time timers near end-of-window > --- > > Key: BEAM-9557 > URL: https://issues.apache.org/jira/browse/BEAM-9557 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Steve Niemitz >Assignee: Reuven Lax >Priority: Critical > Fix For: 2.20.0 > > > Previously, it was possible to set a processing time timer past the end of a > window, and it would simply not fire. > However, now, this results in an error: > {code:java} > java.lang.IllegalArgumentException: Attempted to set event time timer that > outputs for 2020-03-19T18:01:35.000Z but that is after the expiration of > window 2020-03-19T17:59:59.999Z > > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440) > > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setAndVerifyOutputTimestamp(SimpleDoFnRunner.java:1011) > > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setRelative(SimpleDoFnRunner.java:934) > .processElement(???.scala:187) > {code} > > I think the regression was introduced in commit > a005fd765a762183ca88df90f261f6d4a20cf3e0. Also notice that the error message > is wrong, it says that "event time timer" but the timer is in the processing > time domain. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-9578) Enumerating artifacts is too expensive in Java
[ https://issues.apache.org/jira/browse/BEAM-9578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17065247#comment-17065247 ] Reuven Lax commented on BEAM-9578: -- This is causing Java Precommit tests to be extremely flaky (often timing out), which is impacting developmentn > Enumerating artifacts is too expensive in Java > -- > > Key: BEAM-9578 > URL: https://issues.apache.org/jira/browse/BEAM-9578 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Reporter: Luke Cwik >Assignee: Heejong Lee >Priority: Critical > Fix For: 2.21.0 > > > There are a lot of places (e.g. *ParDoTranslation#getParDoPayload*) which > effectively call *Environments#createOrGetDefaultEnvironment* which causes > [artifacts to be > computed|https://github.com/apache/beam/blob/fc6cef9972780ca6b7525d4aadd65a8344221f1b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java#L114]. > This leads to zipping directories for non-jar dependencies. > Similar problems may exist for Python/Go. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-9557) Error setting processing time timers near end-of-window
[ https://issues.apache.org/jira/browse/BEAM-9557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17065112#comment-17065112 ] Reuven Lax commented on BEAM-9557: -- This looks like it might be a type bug? I think that checkArgument(!target.isAfter(windowExpiry)) should actually have been checkArgument(!outputTimestamp.isAfter(windowExpiry), > Error setting processing time timers near end-of-window > --- > > Key: BEAM-9557 > URL: https://issues.apache.org/jira/browse/BEAM-9557 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Steve Niemitz >Assignee: Reuven Lax >Priority: Critical > Fix For: 2.20.0 > > > Previously, it was possible to set a processing time timer past the end of a > window, and it would simply not fire. > However, now, this results in an error: > {code:java} > java.lang.IllegalArgumentException: Attempted to set event time timer that > outputs for 2020-03-19T18:01:35.000Z but that is after the expiration of > window 2020-03-19T17:59:59.999Z > > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440) > > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setAndVerifyOutputTimestamp(SimpleDoFnRunner.java:1011) > > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setRelative(SimpleDoFnRunner.java:934) > .processElement(???.scala:187) > {code} > > I think the regression was introduced in commit > a005fd765a762183ca88df90f261f6d4a20cf3e0. Also notice that the error message > is wrong, it says that "event time timer" but the timer is in the processing > time domain. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-9569) Coder inference should be disabled for Row types
Reuven Lax created BEAM-9569: Summary: Coder inference should be disabled for Row types Key: BEAM-9569 URL: https://issues.apache.org/jira/browse/BEAM-9569 Project: Beam Issue Type: Sub-task Components: sdk-java-core Reporter: Reuven Lax Coder inference will pick up SerializableCoder, which is incorrect. We should always be using schemas. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-9568) Move Beam SQL to use the schema join transforms
Reuven Lax created BEAM-9568: Summary: Move Beam SQL to use the schema join transforms Key: BEAM-9568 URL: https://issues.apache.org/jira/browse/BEAM-9568 Project: Beam Issue Type: Sub-task Components: sdk-java-core Reporter: Reuven Lax -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-9442) Schema Select does not properly handle nested nullable fields
Reuven Lax created BEAM-9442: Summary: Schema Select does not properly handle nested nullable fields Key: BEAM-9442 URL: https://issues.apache.org/jira/browse/BEAM-9442 Project: Beam Issue Type: Sub-task Components: sdk-java-harness Reporter: Reuven Lax A select of a nested field should be nullable if any of its parents are nullable. So for example, a select of "a.b" should return a field named b that is nullable if _either_ of a or b is nullable. Today we only examine b to see if the selected fields should be nullable. Also the Select transform itself does not properly check for null values, and throws NullPointerExceptions when some row values are null. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (BEAM-7333) Select need the ability to rename fields
[ https://issues.apache.org/jira/browse/BEAM-7333?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reuven Lax resolved BEAM-7333. -- Fix Version/s: 2.20.0 Resolution: Fixed > Select need the ability to rename fields > > > Key: BEAM-7333 > URL: https://issues.apache.org/jira/browse/BEAM-7333 > Project: Beam > Issue Type: Sub-task > Components: sdk-java-core >Reporter: Reuven Lax >Priority: Major > Fix For: 2.20.0 > > > There needs to be a way to rename fields in Select - i.e. > Select("field1").as("field2"). > While in many cases the RenameFields transform could be used, that doesn't > help when a Select would return conflicting names - i.e. Select("a.c", > "b.c"). In this case the Select transform would fail, because the output > schema would have two fields of the same name. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-9393) support schemas in state API
Reuven Lax created BEAM-9393: Summary: support schemas in state API Key: BEAM-9393 URL: https://issues.apache.org/jira/browse/BEAM-9393 Project: Beam Issue Type: Sub-task Components: sdk-java-core Reporter: Reuven Lax -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-9353) ByteBuddy Schema code does not properly handle null values
Reuven Lax created BEAM-9353: Summary: ByteBuddy Schema code does not properly handle null values Key: BEAM-9353 URL: https://issues.apache.org/jira/browse/BEAM-9353 Project: Beam Issue Type: Sub-task Components: sdk-java-core Reporter: Reuven Lax -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-9331) The Row object needs better builders
Reuven Lax created BEAM-9331: Summary: The Row object needs better builders Key: BEAM-9331 URL: https://issues.apache.org/jira/browse/BEAM-9331 Project: Beam Issue Type: Sub-task Components: sdk-java-core Reporter: Reuven Lax Users should be able to build a Row object by specifying field names. Desired syntax: Row.withSchema(schema) .withFieldName("field1", "value) .withFieldName("field2.field3", value) .build() Users should also have a builder that allows taking an existing row and changing specific fields. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-8543) Dataflow streaming timers are not strictly time ordered when set earlier mid-bundle
[ https://issues.apache.org/jira/browse/BEAM-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17027918#comment-17027918 ] Reuven Lax commented on BEAM-8543: -- The problem is more general: if an input bundle contains both element and timers, the effect of processing the elements does not affect the timers in the bundle. So assume an input bundle contains an element E and a timer T set for 12pm. While processing E, the user resets the timer T to be at 1pm. Correct behavior would be to skip the timer in this bundle, as it's no longer eligible to fire, however today it will still fire. > Dataflow streaming timers are not strictly time ordered when set earlier > mid-bundle > --- > > Key: BEAM-8543 > URL: https://issues.apache.org/jira/browse/BEAM-8543 > Project: Beam > Issue Type: Bug > Components: runner-dataflow >Affects Versions: 2.13.0 >Reporter: Jan Lukavský >Assignee: Kenneth Knowles >Priority: Major > > Let's suppose we have the following situation: > - statful ParDo with two timers - timerA and timerB > - timerA is set for window.maxTimestamp() + 1 > - timerB is set anywhere between timerB.timestamp > - input watermark moves to BoundedWindow.TIMESTAMP_MAX_VALUE > Then the order of timers is as follows (correct): > - timerB > - timerA > But, if timerB sets another timer (say for timerB.timestamp + 1), then the > order of timers will be: > - timerB (timerB.timestamp) > - timerA (BoundedWindow.TIMESTAMP_MAX_VALUE) > - timerB (timerB.timestamp + 1) > Which is not ordered by timestamp. The reason for this is that when the input > watermark update is evaluated, the WatermarkManager,extractFiredTimers() will > produce both timerA and timerB. That would be correct, but when timerB sets > another timer, that breaks this. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-1589) Add OnWindowExpiration method to Stateful DoFn
[ https://issues.apache.org/jira/browse/BEAM-1589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17011071#comment-17011071 ] Reuven Lax commented on BEAM-1589: -- output timestamp support is now in (Flink support pending). I think we should just use the same timer that's used for garbage collection to fire OnWindowExpiration callbacks. In order to make this work, we need to modify this timer to have an output timestamp inside the window (maybe the end of the window?). > Add OnWindowExpiration method to Stateful DoFn > -- > > Key: BEAM-1589 > URL: https://issues.apache.org/jira/browse/BEAM-1589 > Project: Beam > Issue Type: New Feature > Components: runner-core, sdk-java-core >Reporter: Jingsong Lee >Priority: Major > Time Spent: 1h 20m > Remaining Estimate: 0h > > See BEAM-1517 > This allows the user to do some work before the state's garbage collection. > It seems kind of annoying, but on the other hand forgetting to set a final > timer to flush state is probably data loss most of the time. > FlinkRunner does this work very simply, but other runners, such as > DirectRunner, need to traverse all the states to do this, and maybe it's a > little hard. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-9010) BigQuery TableRow's size is toString().length() ?
[ https://issues.apache.org/jira/browse/BEAM-9010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17001198#comment-17001198 ] Reuven Lax commented on BEAM-9010: -- Could you check to see whether the output of TableRowJsonCoder has changed as well? If it's the same then we're ok - and the test should be using TableRowJsonCoder instead of toString. > BigQuery TableRow's size is toString().length() ? > - > > Key: BEAM-9010 > URL: https://issues.apache.org/jira/browse/BEAM-9010 > Project: Beam > Issue Type: Improvement > Components: runner-dataflow >Reporter: Tomo Suzuki >Priority: Minor > > The following tests failed when I tried to upgrade google-http-client 1.34.0 > from 1.28.0: > {noformat} > org.apache.beam.sdk.io.gcp.bigquery.BigQueryIOReadTest.testEstimatedSizeWithoutStreamingBuffer > org.apache.beam.sdk.io.gcp.bigquery.BigQueryIOReadTest.testEstimatedSizeWithStreamingBuffer > org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtilTest.testInsertAll > {noformat} > [https://builds.apache.org/job/beam_PreCommit_Java_Commit/9288/#showFailuresLink] > h3. Reason of the test failures > [org.apache.beam.sdk.io.gcp.testing.TableContainer|https://github.com/apache/beam/blob/6fa94c9/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/TableContainer.java#L43] > and > [org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl|https://github.com/apache/beam/blob/c2f0d28/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java#L758] > rely on {{TableRow.toString().length()}} to calculate the size. Example: > {code:java} > dataSize += row.toString().length(); > if (dataSize >= maxRowBatchSize > || rows.size() >= maxRowsPerBatch > || i == rowsToPublish.size() - 1) { > {code} > However, with [google-http-client's > PR#589|https://github.com/googleapis/google-http-java-client/pull/589/files#diff-914cd7ff18143b3d2398149e1cfb4f45R218], > the GenericData.toString output has changed since v1.29.0. > In old google-http-client 1.28.0, an example row's toString returned: > {noformat} > {f=[{v=foo}, {v=1234}]} > {noformat} > In new google-http-client 1.29.0 and higher, the same row's toString returns: > {noformat} > GenericData{classInfo=[f], {f=[GenericData{classInfo=[v], {v=foo}}, > GenericData{classInfo=[v], {v=1234}}]}} > {noformat} > h1. Question: > Is this right thing to rely on {{toString().length()}} in the BigQuery > classes? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-9010) BigQuery TableRow's size is toString().length() ?
[ https://issues.apache.org/jira/browse/BEAM-9010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17001196#comment-17001196 ] Reuven Lax commented on BEAM-9010: -- This change could be a major performance regression for production users, as quotas I believe are based on the full JSON size. This could cause pipelines that used to work to suddenly stop working. We might have to hold off upgrading the library until this can be resolved. > BigQuery TableRow's size is toString().length() ? > - > > Key: BEAM-9010 > URL: https://issues.apache.org/jira/browse/BEAM-9010 > Project: Beam > Issue Type: Improvement > Components: runner-dataflow >Reporter: Tomo Suzuki >Priority: Minor > > The following tests failed when I tried to upgrade google-http-client 1.34.0 > from 1.28.0: > {noformat} > org.apache.beam.sdk.io.gcp.bigquery.BigQueryIOReadTest.testEstimatedSizeWithoutStreamingBuffer > org.apache.beam.sdk.io.gcp.bigquery.BigQueryIOReadTest.testEstimatedSizeWithStreamingBuffer > org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtilTest.testInsertAll > {noformat} > [https://builds.apache.org/job/beam_PreCommit_Java_Commit/9288/#showFailuresLink] > h3. Reason of the test failures > [org.apache.beam.sdk.io.gcp.testing.TableContainer|https://github.com/apache/beam/blob/6fa94c9/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/TableContainer.java#L43] > and > [org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl|https://github.com/apache/beam/blob/c2f0d28/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java#L758] > rely on {{TableRow.toString().length()}} to calculate the size. Example: > {code:java} > dataSize += row.toString().length(); > if (dataSize >= maxRowBatchSize > || rows.size() >= maxRowsPerBatch > || i == rowsToPublish.size() - 1) { > {code} > However, with [google-http-client's > PR#589|https://github.com/googleapis/google-http-java-client/pull/589/files#diff-914cd7ff18143b3d2398149e1cfb4f45R218], > the GenericData.toString output has changed since v1.29.0. > In old google-http-client 1.28.0, an example row's toString returned: > {noformat} > {f=[{v=foo}, {v=1234}]} > {noformat} > In new google-http-client 1.29.0 and higher, the same row's toString returns: > {noformat} > GenericData{classInfo=[f], {f=[GenericData{classInfo=[v], {v=foo}}, > GenericData{classInfo=[v], {v=1234}}]}} > {noformat} > h1. Question: > Is this right thing to rely on {{toString().length()}} in the BigQuery > classes? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (BEAM-6756) Support lazy iterables in schemas
[ https://issues.apache.org/jira/browse/BEAM-6756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reuven Lax reassigned BEAM-6756: Assignee: Reuven Lax (was: Shehzaad Nakhoda) > Support lazy iterables in schemas > - > > Key: BEAM-6756 > URL: https://issues.apache.org/jira/browse/BEAM-6756 > Project: Beam > Issue Type: Sub-task > Components: sdk-java-core >Reporter: Reuven Lax >Assignee: Reuven Lax >Priority: Major > Time Spent: 0.5h > Remaining Estimate: 0h > > The iterables returned by GroupByKey and CoGroupByKey are lazy; this allows a > runner to page data into memory if the full iterable is too large. We > currently don't support this in Schemas, so the Schema Group and CoGroup > transforms materialize all data into memory. We should add support for this. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-3772) BigQueryIO - Can't use DynamicDestination with CREATE_IF_NEEDED for unbounded PCollection and FILE_LOADS
[ https://issues.apache.org/jira/browse/BEAM-3772?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16932718#comment-16932718 ] Reuven Lax commented on BEAM-3772: -- It's possible that there's a bug with time partitioning. If time-partitioned tables interact with create disposition differently, some assumptions might be breaking in the code. > BigQueryIO - Can't use DynamicDestination with CREATE_IF_NEEDED for unbounded > PCollection and FILE_LOADS > > > Key: BEAM-3772 > URL: https://issues.apache.org/jira/browse/BEAM-3772 > Project: Beam > Issue Type: Bug > Components: io-java-gcp >Affects Versions: 2.2.0, 2.3.0 > Environment: Dataflow streaming pipeline >Reporter: Benjamin BENOIST >Assignee: Reuven Lax >Priority: Major > Attachments: bigquery-fail.png, bigquery-success.png, > image-2019-09-17-12-01-42-764.png > > > My workflow : KAFKA -> Dataflow streaming -> BigQuery > Given that having low-latency isn't important in my case, I use FILE_LOADS to > reduce the costs. I'm using _BigQueryIO.Write_ with a _DynamicDestination_, > which is a table with the current hour as a suffix. > This _BigQueryIO.Write_ is configured like this : > {code:java} > .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) > .withMethod(Method.FILE_LOADS) > .withTriggeringFrequency(triggeringFrequency) > .withNumFileShards(100) > {code} > The first table is successfully created and is written to. But then the > following tables are never created and I get these exceptions: > {code:java} > (99e5cd8c66414e7a): java.lang.RuntimeException: Failed to create load job > with id prefix > 5047f71312a94bf3a42ee5d67feede75_5295fbf25e1a7534f85e25dcaa9f4986_1_00023, > reached max retries: 3, last failed load job: { > "configuration" : { > "load" : { > "createDisposition" : "CREATE_NEVER", > "destinationTable" : { > "datasetId" : "dev_mydataset", > "projectId" : "myproject-id", > "tableId" : "mytable_20180302_16" > }, > {code} > The _CreateDisposition_ used is _CREATE_NEVER_, contrary as > _CREATE_IF_NEEDED_ as specified. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-3772) BigQueryIO - Can't use DynamicDestination with CREATE_IF_NEEDED for unbounded PCollection and FILE_LOADS
[ https://issues.apache.org/jira/browse/BEAM-3772?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16932698#comment-16932698 ] Reuven Lax commented on BEAM-3772: -- Are you calling withTimePartitioning on BigQueryIO? Also, are you using clustering? > BigQueryIO - Can't use DynamicDestination with CREATE_IF_NEEDED for unbounded > PCollection and FILE_LOADS > > > Key: BEAM-3772 > URL: https://issues.apache.org/jira/browse/BEAM-3772 > Project: Beam > Issue Type: Bug > Components: io-java-gcp >Affects Versions: 2.2.0, 2.3.0 > Environment: Dataflow streaming pipeline >Reporter: Benjamin BENOIST >Assignee: Reuven Lax >Priority: Major > Attachments: bigquery-fail.png, bigquery-success.png, > image-2019-09-17-12-01-42-764.png > > > My workflow : KAFKA -> Dataflow streaming -> BigQuery > Given that having low-latency isn't important in my case, I use FILE_LOADS to > reduce the costs. I'm using _BigQueryIO.Write_ with a _DynamicDestination_, > which is a table with the current hour as a suffix. > This _BigQueryIO.Write_ is configured like this : > {code:java} > .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) > .withMethod(Method.FILE_LOADS) > .withTriggeringFrequency(triggeringFrequency) > .withNumFileShards(100) > {code} > The first table is successfully created and is written to. But then the > following tables are never created and I get these exceptions: > {code:java} > (99e5cd8c66414e7a): java.lang.RuntimeException: Failed to create load job > with id prefix > 5047f71312a94bf3a42ee5d67feede75_5295fbf25e1a7534f85e25dcaa9f4986_1_00023, > reached max retries: 3, last failed load job: { > "configuration" : { > "load" : { > "createDisposition" : "CREATE_NEVER", > "destinationTable" : { > "datasetId" : "dev_mydataset", > "projectId" : "myproject-id", > "tableId" : "mytable_20180302_16" > }, > {code} > The _CreateDisposition_ used is _CREATE_NEVER_, contrary as > _CREATE_IF_NEEDED_ as specified. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (BEAM-3772) BigQueryIO - Can't use DynamicDestination with CREATE_IF_NEEDED for unbounded PCollection and FILE_LOADS
[ https://issues.apache.org/jira/browse/BEAM-3772?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reuven Lax reassigned BEAM-3772: Assignee: Reuven Lax (was: Chamikara Jayalath) > BigQueryIO - Can't use DynamicDestination with CREATE_IF_NEEDED for unbounded > PCollection and FILE_LOADS > > > Key: BEAM-3772 > URL: https://issues.apache.org/jira/browse/BEAM-3772 > Project: Beam > Issue Type: Bug > Components: io-java-gcp >Affects Versions: 2.2.0, 2.3.0 > Environment: Dataflow streaming pipeline >Reporter: Benjamin BENOIST >Assignee: Reuven Lax >Priority: Major > Attachments: bigquery-fail.png, bigquery-success.png > > > My workflow : KAFKA -> Dataflow streaming -> BigQuery > Given that having low-latency isn't important in my case, I use FILE_LOADS to > reduce the costs. I'm using _BigQueryIO.Write_ with a _DynamicDestination_, > which is a table with the current hour as a suffix. > This _BigQueryIO.Write_ is configured like this : > {code:java} > .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) > .withMethod(Method.FILE_LOADS) > .withTriggeringFrequency(triggeringFrequency) > .withNumFileShards(100) > {code} > The first table is successfully created and is written to. But then the > following tables are never created and I get these exceptions: > {code:java} > (99e5cd8c66414e7a): java.lang.RuntimeException: Failed to create load job > with id prefix > 5047f71312a94bf3a42ee5d67feede75_5295fbf25e1a7534f85e25dcaa9f4986_1_00023, > reached max retries: 3, last failed load job: { > "configuration" : { > "load" : { > "createDisposition" : "CREATE_NEVER", > "destinationTable" : { > "datasetId" : "dev_mydataset", > "projectId" : "myproject-id", > "tableId" : "mytable_20180302_16" > }, > {code} > The _CreateDisposition_ used is _CREATE_NEVER_, contrary as > _CREATE_IF_NEEDED_ as specified. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (BEAM-3772) BigQueryIO - Can't use DynamicDestination with CREATE_IF_NEEDED for unbounded PCollection and FILE_LOADS
[ https://issues.apache.org/jira/browse/BEAM-3772?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16929839#comment-16929839 ] Reuven Lax commented on BEAM-3772: -- As mentioned on the PR, triggers will be per key after a reshuffle, so it should be impossible for a table to skip a trigger. We have fixed some bugs in this code though that could have potentially caused this error, and this Jira mentions Beam 2.9. Can you try with a more-recent Beam version? Also what is the code for your DynamicDestinations - is it deterministic? > BigQueryIO - Can't use DynamicDestination with CREATE_IF_NEEDED for unbounded > PCollection and FILE_LOADS > > > Key: BEAM-3772 > URL: https://issues.apache.org/jira/browse/BEAM-3772 > Project: Beam > Issue Type: Bug > Components: io-java-gcp >Affects Versions: 2.2.0, 2.3.0 > Environment: Dataflow streaming pipeline >Reporter: Benjamin BENOIST >Assignee: Chamikara Jayalath >Priority: Major > Attachments: bigquery-fail.png, bigquery-success.png > > > My workflow : KAFKA -> Dataflow streaming -> BigQuery > Given that having low-latency isn't important in my case, I use FILE_LOADS to > reduce the costs. I'm using _BigQueryIO.Write_ with a _DynamicDestination_, > which is a table with the current hour as a suffix. > This _BigQueryIO.Write_ is configured like this : > {code:java} > .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) > .withMethod(Method.FILE_LOADS) > .withTriggeringFrequency(triggeringFrequency) > .withNumFileShards(100) > {code} > The first table is successfully created and is written to. But then the > following tables are never created and I get these exceptions: > {code:java} > (99e5cd8c66414e7a): java.lang.RuntimeException: Failed to create load job > with id prefix > 5047f71312a94bf3a42ee5d67feede75_5295fbf25e1a7534f85e25dcaa9f4986_1_00023, > reached max retries: 3, last failed load job: { > "configuration" : { > "load" : { > "createDisposition" : "CREATE_NEVER", > "destinationTable" : { > "datasetId" : "dev_mydataset", > "projectId" : "myproject-id", > "tableId" : "mytable_20180302_16" > }, > {code} > The _CreateDisposition_ used is _CREATE_NEVER_, contrary as > _CREATE_IF_NEEDED_ as specified. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (BEAM-8102) Portable Flink and Spark failing testSideInputAnnotation[WithMultipleSideInputs]
[ https://issues.apache.org/jira/browse/BEAM-8102?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16920193#comment-16920193 ] Reuven Lax commented on BEAM-8102: -- sent pr/9461 to fix > Portable Flink and Spark failing > testSideInputAnnotation[WithMultipleSideInputs] > > > Key: BEAM-8102 > URL: https://issues.apache.org/jira/browse/BEAM-8102 > Project: Beam > Issue Type: Bug > Components: java-fn-execution, runner-flink, runner-spark >Reporter: Kyle Weaver >Priority: Major > Labels: portability-flink, portability-spark > Time Spent: 10m > Remaining Estimate: 0h > > [https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/] > [https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/] > java.lang.AssertionError: Pipeline did not succeed. Expected: is but: > was -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (BEAM-8102) Portable Flink and Spark failing testSideInputAnnotation[WithMultipleSideInputs]
[ https://issues.apache.org/jira/browse/BEAM-8102?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16920190#comment-16920190 ] Reuven Lax commented on BEAM-8102: -- Taking a look. I believe this is an easy fix. > Portable Flink and Spark failing > testSideInputAnnotation[WithMultipleSideInputs] > > > Key: BEAM-8102 > URL: https://issues.apache.org/jira/browse/BEAM-8102 > Project: Beam > Issue Type: Bug > Components: java-fn-execution, runner-flink, runner-spark >Reporter: Kyle Weaver >Priority: Major > Labels: portability-flink, portability-spark > > [https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/] > [https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/] > java.lang.AssertionError: Pipeline did not succeed. Expected: is but: > was -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (BEAM-6858) Support side inputs injected into a DoFn
[ https://issues.apache.org/jira/browse/BEAM-6858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16890370#comment-16890370 ] Reuven Lax commented on BEAM-6858: -- Inside ParDo, you will need to get the rest of the information of the PCollectionView, which means that somewhere a map of TupleTag -> PCollectionView needs to be maintained. [~kenn] do you have advice on where the best place to do this is? > Support side inputs injected into a DoFn > > > Key: BEAM-6858 > URL: https://issues.apache.org/jira/browse/BEAM-6858 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Reporter: Reuven Lax >Assignee: Shehzaad Nakhoda >Priority: Major > > Beam currently supports injecting main inputs into a DoFn process method. A > user can write the following: > @ProcessElement public void process(@Element InputT element) > And Beam will (using ByteBuddy code generation) inject the input element into > the process method. > We would like to also support the same for side inputs. For example: > @ProcessElement public void process(@Element InputT element, > @SideInput("tag1") String input1, @SideInput("tag2") Integer input2) > This requires the existing process-method analysis framework to capture these > side inputs. The ParDo code would have to verify the type of the side input > and include them in the list of side inputs. This would also eliminate the > need for the user to explicitly call withSideInputs on the ParDo. > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (BEAM-6858) Support side inputs injected into a DoFn
[ https://issues.apache.org/jira/browse/BEAM-6858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16890365#comment-16890365 ] Reuven Lax commented on BEAM-6858: -- Some more details: We need to add a way for the user to control the name of the SideInput in the PCollectionView. I think we should add new overloads to the factory methods. e.g. PCollectionView sideInput1 = .apply("View.asSingleton("sideInput1")); PCollectionView> sideInput2 = ...apply("View.asList("sideInput2")) These names can then be used to generate the side input. > Support side inputs injected into a DoFn > > > Key: BEAM-6858 > URL: https://issues.apache.org/jira/browse/BEAM-6858 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Reporter: Reuven Lax >Assignee: Shehzaad Nakhoda >Priority: Major > > Beam currently supports injecting main inputs into a DoFn process method. A > user can write the following: > @ProcessElement public void process(@Element InputT element) > And Beam will (using ByteBuddy code generation) inject the input element into > the process method. > We would like to also support the same for side inputs. For example: > @ProcessElement public void process(@Element InputT element, > @SideInput("tag1") String input1, @SideInput("tag2") Integer input2) > This requires the existing process-method analysis framework to capture these > side inputs. The ParDo code would have to verify the type of the side input > and include them in the list of side inputs. This would also eliminate the > need for the user to explicitly call withSideInputs on the ParDo. > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Resolved] (BEAM-6675) The JdbcIO sink should accept schemas
[ https://issues.apache.org/jira/browse/BEAM-6675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reuven Lax resolved BEAM-6675. -- Resolution: Fixed Fix Version/s: 2.14.0 > The JdbcIO sink should accept schemas > - > > Key: BEAM-6675 > URL: https://issues.apache.org/jira/browse/BEAM-6675 > Project: Beam > Issue Type: Sub-task > Components: io-java-jdbc >Reporter: Reuven Lax >Assignee: Shehzaad Nakhoda >Priority: Major > Fix For: 2.14.0 > > Time Spent: 7h 50m > Remaining Estimate: 0h > > If the input has a schema, there should be a default mapping to a > PreparedStatement for writing based on that schema. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (BEAM-6855) Side inputs are not supported when using the state API
[ https://issues.apache.org/jira/browse/BEAM-6855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885853#comment-16885853 ] Reuven Lax commented on BEAM-6855: -- [~kenn] it was never addressed. Can you confirm that the described solution is the correct one? > Side inputs are not supported when using the state API > -- > > Key: BEAM-6855 > URL: https://issues.apache.org/jira/browse/BEAM-6855 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Reporter: Reuven Lax >Assignee: Shehzaad Nakhoda >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (BEAM-6855) Side inputs are not supported when using the state API
[ https://issues.apache.org/jira/browse/BEAM-6855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885670#comment-16885670 ] Reuven Lax commented on BEAM-6855: -- I believe that the side-input code in SimplePushbackSideInputDoFnRunner needs to be implemented in StatefulDoFnRunner. [~kenn] can you confirm? > Side inputs are not supported when using the state API > -- > > Key: BEAM-6855 > URL: https://issues.apache.org/jira/browse/BEAM-6855 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Reporter: Reuven Lax >Assignee: Shehzaad Nakhoda >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Resolved] (BEAM-6674) The JdbcIO source should produce schemas
[ https://issues.apache.org/jira/browse/BEAM-6674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reuven Lax resolved BEAM-6674. -- Resolution: Fixed Fix Version/s: 2.14.0 > The JdbcIO source should produce schemas > > > Key: BEAM-6674 > URL: https://issues.apache.org/jira/browse/BEAM-6674 > Project: Beam > Issue Type: Sub-task > Components: io-java-jdbc >Reporter: Reuven Lax >Assignee: Charith Ellawala >Priority: Major > Fix For: 2.14.0 > > Time Spent: 4h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (BEAM-6673) BigQueryIO.Read should automatically produce schemas
[ https://issues.apache.org/jira/browse/BEAM-6673?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reuven Lax resolved BEAM-6673. -- Resolution: Fixed Fix Version/s: 2.14.0 > BigQueryIO.Read should automatically produce schemas > > > Key: BEAM-6673 > URL: https://issues.apache.org/jira/browse/BEAM-6673 > Project: Beam > Issue Type: Sub-task > Components: io-java-gcp >Reporter: Reuven Lax >Assignee: Charith Ellawala >Priority: Major > Fix For: 2.14.0 > > Time Spent: 2h 20m > Remaining Estimate: 0h > > The output PCollections should contain -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-7425) Reading BigQuery Table Data into Java Classes(Pojo) Directly
[ https://issues.apache.org/jira/browse/BEAM-7425?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16861204#comment-16861204 ] Reuven Lax commented on BEAM-7425: -- BTW the above works. It would be easy to combine into BigQueryIO if there's a need so you can avoid the Convert transform. > Reading BigQuery Table Data into Java Classes(Pojo) Directly > > > Key: BEAM-7425 > URL: https://issues.apache.org/jira/browse/BEAM-7425 > Project: Beam > Issue Type: New Feature > Components: io-java-avro, io-java-gcp >Affects Versions: 2.12.0 > Environment: Dataflow >Reporter: Kishan Kumar >Priority: Major > > While Developing my code I used the below code snippet to read the table data > from BigQuery. > > {code:java} > PCollection gpseEftReasonCodes = input > .apply("Reading xxyyzz", > BigQueryIO > .read(new ReadTable(ReasonCode.class)) > .withoutValidation() > .withTemplateCompatibility() > .fromQuery("Select * from dataset.xxyyzz") > .usingStandardSql() > .withCoder(SerializableCoder.of(xxyyzz.class)) > {code} > Read Table Class: > {code:java} > @DefaultSchema(JavaBeanSchema.class) > public class ReadTable implements SerializableFunction > { > private static final long serialVersionUID = 1L; > private static Gson gson = new Gson(); > public static final Logger LOG = LoggerFactory.getLogger(ReadTable.class); > private final Counter countingRecords = > Metrics.counter(ReadTable.class, "Reading Records EFT Report"); > private Class class1; > > public ReadTable(Class class1) { this.class1 = class1; } > > public T apply(SchemaAndRecord schemaAndRecord) { > Map mapping = new HashMap<>(); > int counter = 0; > try { > GenericRecord s = schemaAndRecord.getRecord(); > org.apache.avro.Schema s1 = s.getSchema(); > for (Field f : s1.getFields()) { > counter++; > mapping.put(f.name(), null==s.get(f.name()) ? null : > String.valueOf(s.get(counter))); > } > countingRecords.inc(); > JsonElement jsonElement = gson.toJsonTree(mapping); > return gson.fromJson(jsonElement, class1); > } catch (Exception mp) { > LOG.error("Found Wrong Mapping for the Record: "+mapping); > mp.printStackTrace(); return null; } > } > } > {code} > So After Reading the data from Bigquery I was mapping data from > SchemaAndRecord to pojo I was getting value for columns whose Data type is > Numeric mention below. > {code} > last_update_amount=java.nio.HeapByteBuffer[pos=0 lim=16 cap=16] > {code} > My Expectation was I will get exact value but getting the HyperByte Buffer > the version I am using is Apache beam 2.12.0. If any more information is > needed then please let me know. > Way 2 Tried: > {code:java} > GenericRecord s = schemaAndRecord.getRecord(); > org.apache.avro.Schema s1 = s.getSchema(); > for (Field f : s1.getFields()) { > counter++; > mapping.put(f.name(), null==s.get(f.name()) ? null : > String.valueOf(s.get(counter))); > if(f.name().equalsIgnoreCase("reason_code_id")) { > BigDecimal numericValue = new Conversions.DecimalConversion() >.fromBytes((ByteBuffer) s.get(f.name()), Schema.create(s1.getType()), > s1.getLogicalType()); >System.out.println("Numeric Con"+numericValue); > } else { > System.out.println("Else Condition "+f.name()); > } > {code} > Facing Issue: > {code} > 2019-05-24 (14:10:37) org.apache.avro.AvroRuntimeException: Can't create a: > RECORD > {code} > > It would be Great if we have a method which maps all the BigQuery Data with > Pojo Schema which Means if I have 10 Columns in BQ and in my Pojo I need only > 5 Column then, in that case, BigQueryIO should map only that 5 Data values > into Java Class and Rest will be Rejected As I am Doing After So much Effort. > Numeric Data Type must be Deserialize by itself while fetching data like > TableRow. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-7356) Select should support nested schema selectors
Reuven Lax created BEAM-7356: Summary: Select should support nested schema selectors Key: BEAM-7356 URL: https://issues.apache.org/jira/browse/BEAM-7356 Project: Beam Issue Type: Sub-task Components: sdk-java-core Reporter: Reuven Lax -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-7116) Remove KV from Schema transforms
[ https://issues.apache.org/jira/browse/BEAM-7116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16841777#comment-16841777 ] Reuven Lax commented on BEAM-7116: -- The problem is that Beam special cases KvCoder all over the place, so if we cause KV to use SchemaCoder we will break large parts of Beam. I think it will be easier to just remove KV from our interface and let any two-field schema translate to KV. However what you suggested is indeed a problem in Schema type inference - we don't do a good job with generic classes (someone trying AutoValueSchema hit this). Do you want to file a JIRA for this issue, as there doesn't appear to be one? *From: *Brian Hulette (JIRA) *Date: *Thu, May 16, 2019 at 1:38 PM *To: * > Remove KV from Schema transforms > > > Key: BEAM-7116 > URL: https://issues.apache.org/jira/browse/BEAM-7116 > Project: Beam > Issue Type: Sub-task > Components: sdk-java-core >Reporter: Reuven Lax >Priority: Major > > Instead of returning KV objects, we should return a Schema with two fields. > The Convert transform should be able to convert these to KV objects. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-7337) Allow selecting slices of arrays and maps
Reuven Lax created BEAM-7337: Summary: Allow selecting slices of arrays and maps Key: BEAM-7337 URL: https://issues.apache.org/jira/browse/BEAM-7337 Project: Beam Issue Type: Sub-task Components: sdk-java-core Reporter: Reuven Lax It would be nice to allow selecting array slices. So for instance Select.fieldNames("a.b[0:3].c") Selecting specific keys from maps would be especially useful Select.fieldNames("event.errors\{accessDenied}.originator") -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (BEAM-6772) Select transform has non-intuitive semantics
[ https://issues.apache.org/jira/browse/BEAM-6772?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reuven Lax resolved BEAM-6772. -- Resolution: Fixed Fix Version/s: 2.12.0 > Select transform has non-intuitive semantics > > > Key: BEAM-6772 > URL: https://issues.apache.org/jira/browse/BEAM-6772 > Project: Beam > Issue Type: Sub-task > Components: sdk-java-core >Reporter: Reuven Lax >Assignee: Reuven Lax >Priority: Major > Fix For: 2.12.0 > > Time Spent: 12h 10m > Remaining Estimate: 0h > > Consider the following schema: > User: > name: STRING > location: Location > > Location: > latitude: DOUBLE > longitude: DOUBLE > > If you apply Select.fieldNames("location"), most users expect to get back a > row matching the Location schema. Instead you get back an outer schema with a > single location field in it. Select should instead unnest the output up to > the point where multiple fields are selected. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-7116) Remove KV from Schema transforms
[ https://issues.apache.org/jira/browse/BEAM-7116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16841051#comment-16841051 ] Reuven Lax commented on BEAM-7116: -- To make this seamless, the Convert transform should be able to convert KVs to/from rows. Converting from row -> KV might require an override that tells Convert which field is the key and which is the value. > Remove KV from Schema transforms > > > Key: BEAM-7116 > URL: https://issues.apache.org/jira/browse/BEAM-7116 > Project: Beam > Issue Type: Sub-task > Components: sdk-java-core >Reporter: Reuven Lax >Priority: Major > > Instead of returning KV objects, we should return a Schema with two fields. > The Convert transform should be able to convert these to KV objects. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-7336) KafkaIO should support inferring schemas when reading AVRO
Reuven Lax created BEAM-7336: Summary: KafkaIO should support inferring schemas when reading AVRO Key: BEAM-7336 URL: https://issues.apache.org/jira/browse/BEAM-7336 Project: Beam Issue Type: Sub-task Components: io-java-kafka Reporter: Reuven Lax PubSubIO already supports this. It would also be nice to be able to look up AVRO schemas in the Kafka schema registry. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-7335) PubSubIO.writeAvros should infer beam schemas
Reuven Lax created BEAM-7335: Summary: PubSubIO.writeAvros should infer beam schemas Key: BEAM-7335 URL: https://issues.apache.org/jira/browse/BEAM-7335 Project: Beam Issue Type: Sub-task Components: io-java-gcp Reporter: Reuven Lax If the input PCollection has a schema, we should automatically convert it to AVRO. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-7334) Create a better Schema builder
Reuven Lax created BEAM-7334: Summary: Create a better Schema builder Key: BEAM-7334 URL: https://issues.apache.org/jira/browse/BEAM-7334 Project: Beam Issue Type: Sub-task Components: sdk-java-core Reporter: Reuven Lax Right now the Schema builder class is a bit verbose to use. The AddFields transform already contains a nicer way of building schemas, but it's coupled to that transform. We should refactor that code and make it available in the Schema builder. That would allow coder of the form Schema schema = Schema.builder() .addField("a", INT32) .addField("a.b", STRING) .addField("a.c", INT32).build(); Which is much more concise for nested fields. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-7333) Select need the ability to rename fields
Reuven Lax created BEAM-7333: Summary: Select need the ability to rename fields Key: BEAM-7333 URL: https://issues.apache.org/jira/browse/BEAM-7333 Project: Beam Issue Type: Sub-task Components: sdk-java-core Reporter: Reuven Lax There needs to be a way to rename fields in Select - i.e. Select("field1").as("field2"). While in many cases the RenameFields transform could be used, that doesn't help when a Select would return conflicting names - i.e. Select("a.c", "b.c"). In this case the Select transform would fail, because the output schema would have two fields of the same name. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-7116) Remove KV from Schema transforms
[ https://issues.apache.org/jira/browse/BEAM-7116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16841044#comment-16841044 ] Reuven Lax commented on BEAM-7116: -- Transforms like Group that return KV today should instead return a two-field schema. The default field names should be "key" and "value", but ideally these transforms should allow specifying the field names. > Remove KV from Schema transforms > > > Key: BEAM-7116 > URL: https://issues.apache.org/jira/browse/BEAM-7116 > Project: Beam > Issue Type: Sub-task > Components: sdk-java-core >Reporter: Reuven Lax >Priority: Major > > Instead of returning KV objects, we should return a Schema with two fields. > The Convert transform should be able to convert these to KV objects. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (BEAM-7174) Add transforms for modifying schemas
[ https://issues.apache.org/jira/browse/BEAM-7174?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reuven Lax resolved BEAM-7174. -- Resolution: Fixed Fix Version/s: 2.12.0 > Add transforms for modifying schemas > > > Key: BEAM-7174 > URL: https://issues.apache.org/jira/browse/BEAM-7174 > Project: Beam > Issue Type: Sub-task > Components: sdk-java-core >Reporter: Reuven Lax >Assignee: Reuven Lax >Priority: Major > Fix For: 2.12.0 > > Time Spent: 4h 20m > Remaining Estimate: 0h > > We need transforms to add fields, remove fields, and rename fields. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (BEAM-6602) Support schemas in BigQueryIO.Write
[ https://issues.apache.org/jira/browse/BEAM-6602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reuven Lax resolved BEAM-6602. -- Resolution: Fixed Fix Version/s: 2.12.0 > Support schemas in BigQueryIO.Write > --- > > Key: BEAM-6602 > URL: https://issues.apache.org/jira/browse/BEAM-6602 > Project: Beam > Issue Type: Sub-task > Components: io-java-gcp >Reporter: Reuven Lax >Assignee: Reuven Lax >Priority: Major > Fix For: 2.12.0 > > Time Spent: 2h 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (BEAM-6536) ParDo should allow any type with a compatible registered schema in the @Element parameter
[ https://issues.apache.org/jira/browse/BEAM-6536?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reuven Lax resolved BEAM-6536. -- Resolution: Fixed Fix Version/s: 2.12.0 > ParDo should allow any type with a compatible registered schema in the > @Element parameter > - > > Key: BEAM-6536 > URL: https://issues.apache.org/jira/browse/BEAM-6536 > Project: Beam > Issue Type: Sub-task > Components: sdk-java-core >Reporter: Reuven Lax >Assignee: Reuven Lax >Priority: Major > Fix For: 2.12.0 > > > We should stop special casing Row here -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (BEAM-6240) Allow users to annotate POJOs and JavaBeans for richer functionality
[ https://issues.apache.org/jira/browse/BEAM-6240?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reuven Lax resolved BEAM-6240. -- Resolution: Fixed Fix Version/s: 2.12.0 > Allow users to annotate POJOs and JavaBeans for richer functionality > > > Key: BEAM-6240 > URL: https://issues.apache.org/jira/browse/BEAM-6240 > Project: Beam > Issue Type: Sub-task > Components: sdk-java-core >Reporter: Reuven Lax >Assignee: Reuven Lax >Priority: Major > Fix For: 2.12.0 > > Time Spent: 7h 50m > Remaining Estimate: 0h > > Desired annotations: > * SchemaIgnore - ignore this field > * FieldName - allow the user to explicitly specify a field name > * SchemaCreate - register a function to be used to create an object (so > fields can be final, and no default constructor need be assumed). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-7301) Beam transforms reorder fields
Reuven Lax created BEAM-7301: Summary: Beam transforms reorder fields Key: BEAM-7301 URL: https://issues.apache.org/jira/browse/BEAM-7301 Project: Beam Issue Type: Sub-task Components: sdk-java-core Reporter: Reuven Lax Assignee: Yueyang Qiu Currently transforms such as Select, DropFields, RenameFields, and AddFields can create schemas with unexpected order. The problem is that FieldAccessDescriptor stores top-level fields and nested fields separately, so there's no way to tell the relative order between them. To fix this we should refactor FieldAccessDescriptor: instead of storing these separately it should store a single list, where each item in the list might optionally have a nested FieldAccessDescriptor. Expected behavior from the transforms: DropFields: preserves order in original schema RenameFields: preserves order in original schema AddFields: adds fields in order specified. If multiple nested fields are selected, the first reference to the top field wins (e.g. adding "a.b", "c", "a.d" results in adding a before c. Select: Select fields in the order specified. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-7247) DoFnSignatures analysis of @ProcessElement with @Element fails when Java SDK is used from Kotlin
[ https://issues.apache.org/jira/browse/BEAM-7247?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16836556#comment-16836556 ] Reuven Lax commented on BEAM-7247: -- I think the issue might be that our reflection analysis looks for an exact match of type, so we don't currently allow superclasses or interfaces. > DoFnSignatures analysis of @ProcessElement with @Element fails when Java SDK > is used from Kotlin > > > Key: BEAM-7247 > URL: https://issues.apache.org/jira/browse/BEAM-7247 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Reporter: Kenneth Knowles >Assignee: Reuven Lax >Priority: Major > > https://stackoverflow.com/questions/55908999/kotlin-iterable-not-supported-in-apache-beam/55911859#55911859 > https://gist.github.com/marcoslin/e1e19afdbacac9757f6974592cfd8d7f#file-stack-trace-txt -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-7246) Create a Spanner IO for Python
Reuven Lax created BEAM-7246: Summary: Create a Spanner IO for Python Key: BEAM-7246 URL: https://issues.apache.org/jira/browse/BEAM-7246 Project: Beam Issue Type: Bug Components: io-python-gcp Reporter: Reuven Lax Assignee: Shehzaad Nakhoda -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reuven Lax reassigned BEAM-2535: Assignee: Shehzaad Nakhoda (was: Batkhuyag Batsaikhan) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 3h 50m > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-7210) Test ParDoSchemaTest#testInferredSchemaPipeline is flaky
[ https://issues.apache.org/jira/browse/BEAM-7210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16831829#comment-16831829 ] Reuven Lax commented on BEAM-7210: -- I see the problem. I changed things from POJO to AutoValue. Since Java reflection makes no guarantees about the order it returns things, this means that schema inference is always non deterministic in the order of fields. However reflection tends to return variables in order, so it happened to work before. When it comes to listing member methods, reflection is far less deterministic (I suspect it returns things in hashmap order or something). The fix is to access by name instead of index. So replace row.getString(0) with row.getString("stringField"). I'll send a PR to fix this. > Test ParDoSchemaTest#testInferredSchemaPipeline is flaky > > > Key: BEAM-7210 > URL: https://issues.apache.org/jira/browse/BEAM-7210 > Project: Beam > Issue Type: Bug > Components: runner-flink, sdk-java-core >Reporter: Maximilian Michels >Priority: Major > Fix For: 2.13.0 > > > Test ParDoSchemaTest#testInferredSchemaPipeline is flaky. Please see > [https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/3918/testReport/junit/org.apache.beam.sdk.transforms/ParDoSchemaTest/testInferredSchemaPipeline/] > It seems like the backing row data is not populated in a deterministic way: > {noformat} > Caused by: java.lang.ClassCastException: java.lang.Integer cannot be cast to > java.lang.String > at org.apache.beam.sdk.values.Row.getString(Row.java:279) > at > org.apache.beam.sdk.transforms.ParDoSchemaTest$12.process(ParDoSchemaTest.java:391){noformat} > CC [~reuvenlax] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-7189) Remove FieldType metadata
Reuven Lax created BEAM-7189: Summary: Remove FieldType metadata Key: BEAM-7189 URL: https://issues.apache.org/jira/browse/BEAM-7189 Project: Beam Issue Type: Sub-task Components: sdk-java-core Reporter: Reuven Lax Assignee: Reuven Lax We no longer need this metadata given that we now how logical types. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-7174) Add transforms for modifying schemas
Reuven Lax created BEAM-7174: Summary: Add transforms for modifying schemas Key: BEAM-7174 URL: https://issues.apache.org/jira/browse/BEAM-7174 Project: Beam Issue Type: Sub-task Components: sdk-java-core Reporter: Reuven Lax Assignee: Reuven Lax We need transforms to add fields, remove fields, and rename fields. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-7168) Convert should support more boxing and unboxing
Reuven Lax created BEAM-7168: Summary: Convert should support more boxing and unboxing Key: BEAM-7168 URL: https://issues.apache.org/jira/browse/BEAM-7168 Project: Beam Issue Type: Sub-task Components: sdk-java-core Reporter: Reuven Lax Convert should support unboxing to a primitive. It should also support automatically boxing. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-7002) [beam_PostCommit_Java_Nexmark_Dataflow] [Nexmark] NullPointerException in Main
[ https://issues.apache.org/jira/browse/BEAM-7002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16824560#comment-16824560 ] Reuven Lax commented on BEAM-7002: -- Currently they are only used as an optimization (to short-circuit expensive equals() comparisons). On Tue, Apr 23, 2019 at 2:10 PM Kenneth Knowles (JIRA) > [beam_PostCommit_Java_Nexmark_Dataflow] [Nexmark] NullPointerException in Main > -- > > Key: BEAM-7002 > URL: https://issues.apache.org/jira/browse/BEAM-7002 > Project: Beam > Issue Type: Bug > Components: dsl-sql, test-failures >Reporter: Yueyang Qiu >Assignee: Reuven Lax >Priority: Blocker > Labels: currently-failing > Fix For: 2.12.0 > > Time Spent: 1.5h > Remaining Estimate: 0h > > Failing Jenkins Job: > [https://builds.apache.org/job/beam_PostCommit_Java_Nexmark_Dataflow/2304/console] > *12:18:49* Exception in thread "main" java.lang.RuntimeException: > java.lang.NullPointerException*12:18:49*at > org.apache.beam.sdk.nexmark.Main.runAll(Main.java:127)*12:18:49* at > org.apache.beam.sdk.nexmark.Main.main(Main.java:414)*12:18:49* Caused by: > java.lang.NullPointerException*12:18:49* at > org.apache.beam.runners.core.construction.SchemaTranslation.toProto(SchemaTranslation.java:57)*12:18:49* > at > org.apache.beam.runners.core.construction.SchemaTranslation.toProto(SchemaTranslation.java:85)*12:18:49* > at > org.apache.beam.runners.core.construction.SchemaTranslation.toProto(SchemaTranslation.java:73)*12:18:49* > at > org.apache.beam.runners.core.construction.SchemaTranslation.toProto(SchemaTranslation.java:60)*12:18:49* > at > org.apache.beam.runners.dataflow.util.SchemaCoderCloudObjectTranslator.toCloudObject(SchemaCoderCloudObjectTranslator.java:55)*12:18:49* > at > org.apache.beam.runners.dataflow.util.SchemaCoderCloudObjectTranslator.toCloudObject(SchemaCoderCloudObjectTranslator.java:31)*12:18:49* > at > org.apache.beam.runners.dataflow.util.CloudObjects.asCloudObject(CloudObjects.java:69)*12:18:49* > at > org.apache.beam.runners.dataflow.util.CloudObjectTranslators.addComponents(CloudObjectTranslators.java:59)*12:18:49* > at > org.apache.beam.runners.dataflow.util.CloudObjectTranslators.access$000(CloudObjectTranslators.java:51)*12:18:49* > at > org.apache.beam.runners.dataflow.util.CloudObjectTranslators$6.toCloudObject(CloudObjectTranslators.java:250)*12:18:49* > at > org.apache.beam.runners.dataflow.util.CloudObjectTranslators$6.toCloudObject(CloudObjectTranslators.java:245)*12:18:49* > at > org.apache.beam.runners.dataflow.util.CloudObjects.asCloudObject(CloudObjects.java:69)*12:18:49* > at > org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translateCoder(DataflowPipelineTranslator.java:1151)*12:18:49* >at > org.apache.beam.runners.dataflow.DataflowPipelineTranslator.access$600(DataflowPipelineTranslator.java:119)*12:18:49* > at > org.apache.beam.runners.dataflow.DataflowPipelineTranslator$StepTranslator.addOutput(DataflowPipelineTranslator.java:706)*12:18:49* > at > org.apache.beam.runners.dataflow.DataflowPipelineTranslator$StepTranslator.addOutput(DataflowPipelineTranslator.java:652)*12:18:49* > at > org.apache.beam.runners.dataflow.DataflowPipelineTranslator$8.translateSingleHelper(DataflowPipelineTranslator.java:971)*12:18:49* >at > org.apache.beam.runners.dataflow.DataflowPipelineTranslator$8.translate(DataflowPipelineTranslator.java:952)*12:18:49* >at > org.apache.beam.runners.dataflow.DataflowPipelineTranslator$8.translate(DataflowPipelineTranslator.java:949)*12:18:49* >at > org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.visitPrimitiveTransform(DataflowPipelineTranslator.java:475)*12:18:49* > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:665)*12:18:49* > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)*12:18:49* > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)*12:18:49* > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317)*12:18:49* > at > org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251)*12:18:49* > at > org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)*12:18:49* > at > org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.translate(DataflowPipelineTranslator.java:414)*12:18:49* > at > org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translate(DataflowPipelineTranslator.java:173)*12:18:49* > at >
[jira] [Commented] (BEAM-7002) [beam_PostCommit_Java_Nexmark_Dataflow] [Nexmark] NullPointerException in Main
[ https://issues.apache.org/jira/browse/BEAM-7002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16819565#comment-16819565 ] Reuven Lax commented on BEAM-7002: -- [~apilloud] fix is out for review. pr/8325 > [beam_PostCommit_Java_Nexmark_Dataflow] [Nexmark] NullPointerException in Main > -- > > Key: BEAM-7002 > URL: https://issues.apache.org/jira/browse/BEAM-7002 > Project: Beam > Issue Type: Bug > Components: dsl-sql, test-failures >Reporter: Yueyang Qiu >Assignee: Reuven Lax >Priority: Critical > Labels: currently-failing > Time Spent: 10m > Remaining Estimate: 0h > > Failing Jenkins Job: > [https://builds.apache.org/job/beam_PostCommit_Java_Nexmark_Dataflow/2304/console] > *12:18:49* Exception in thread "main" java.lang.RuntimeException: > java.lang.NullPointerException*12:18:49*at > org.apache.beam.sdk.nexmark.Main.runAll(Main.java:127)*12:18:49* at > org.apache.beam.sdk.nexmark.Main.main(Main.java:414)*12:18:49* Caused by: > java.lang.NullPointerException*12:18:49* at > org.apache.beam.runners.core.construction.SchemaTranslation.toProto(SchemaTranslation.java:57)*12:18:49* > at > org.apache.beam.runners.core.construction.SchemaTranslation.toProto(SchemaTranslation.java:85)*12:18:49* > at > org.apache.beam.runners.core.construction.SchemaTranslation.toProto(SchemaTranslation.java:73)*12:18:49* > at > org.apache.beam.runners.core.construction.SchemaTranslation.toProto(SchemaTranslation.java:60)*12:18:49* > at > org.apache.beam.runners.dataflow.util.SchemaCoderCloudObjectTranslator.toCloudObject(SchemaCoderCloudObjectTranslator.java:55)*12:18:49* > at > org.apache.beam.runners.dataflow.util.SchemaCoderCloudObjectTranslator.toCloudObject(SchemaCoderCloudObjectTranslator.java:31)*12:18:49* > at > org.apache.beam.runners.dataflow.util.CloudObjects.asCloudObject(CloudObjects.java:69)*12:18:49* > at > org.apache.beam.runners.dataflow.util.CloudObjectTranslators.addComponents(CloudObjectTranslators.java:59)*12:18:49* > at > org.apache.beam.runners.dataflow.util.CloudObjectTranslators.access$000(CloudObjectTranslators.java:51)*12:18:49* > at > org.apache.beam.runners.dataflow.util.CloudObjectTranslators$6.toCloudObject(CloudObjectTranslators.java:250)*12:18:49* > at > org.apache.beam.runners.dataflow.util.CloudObjectTranslators$6.toCloudObject(CloudObjectTranslators.java:245)*12:18:49* > at > org.apache.beam.runners.dataflow.util.CloudObjects.asCloudObject(CloudObjects.java:69)*12:18:49* > at > org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translateCoder(DataflowPipelineTranslator.java:1151)*12:18:49* >at > org.apache.beam.runners.dataflow.DataflowPipelineTranslator.access$600(DataflowPipelineTranslator.java:119)*12:18:49* > at > org.apache.beam.runners.dataflow.DataflowPipelineTranslator$StepTranslator.addOutput(DataflowPipelineTranslator.java:706)*12:18:49* > at > org.apache.beam.runners.dataflow.DataflowPipelineTranslator$StepTranslator.addOutput(DataflowPipelineTranslator.java:652)*12:18:49* > at > org.apache.beam.runners.dataflow.DataflowPipelineTranslator$8.translateSingleHelper(DataflowPipelineTranslator.java:971)*12:18:49* >at > org.apache.beam.runners.dataflow.DataflowPipelineTranslator$8.translate(DataflowPipelineTranslator.java:952)*12:18:49* >at > org.apache.beam.runners.dataflow.DataflowPipelineTranslator$8.translate(DataflowPipelineTranslator.java:949)*12:18:49* >at > org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.visitPrimitiveTransform(DataflowPipelineTranslator.java:475)*12:18:49* > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:665)*12:18:49* > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)*12:18:49* > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)*12:18:49* > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317)*12:18:49* > at > org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251)*12:18:49* > at > org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)*12:18:49* > at > org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.translate(DataflowPipelineTranslator.java:414)*12:18:49* > at > org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translate(DataflowPipelineTranslator.java:173)*12:18:49* > at > org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:737)*12:18:49* >at >
[jira] [Commented] (BEAM-7002) [beam_PostCommit_Java_Nexmark_Dataflow] [Nexmark] NullPointerException in Main
[ https://issues.apache.org/jira/browse/BEAM-7002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16819533#comment-16819533 ] Reuven Lax commented on BEAM-7002: -- It appears the problem is this line: RunnerApi.Schema.Builder builder = RunnerApi.Schema.newBuilder().setId(schema.getUUID().toString()); Currently we don't guarantee that uuid is always set. The simple fix for now is to check whether getUUID returns null here. Longer term, we probably should guarantee that all schemas have UUIDs. > [beam_PostCommit_Java_Nexmark_Dataflow] [Nexmark] NullPointerException in Main > -- > > Key: BEAM-7002 > URL: https://issues.apache.org/jira/browse/BEAM-7002 > Project: Beam > Issue Type: Bug > Components: dsl-sql, test-failures >Reporter: Yueyang Qiu >Assignee: Reuven Lax >Priority: Critical > Labels: currently-failing > > Failing Jenkins Job: > [https://builds.apache.org/job/beam_PostCommit_Java_Nexmark_Dataflow/2304/console] > *12:18:49* Exception in thread "main" java.lang.RuntimeException: > java.lang.NullPointerException*12:18:49*at > org.apache.beam.sdk.nexmark.Main.runAll(Main.java:127)*12:18:49* at > org.apache.beam.sdk.nexmark.Main.main(Main.java:414)*12:18:49* Caused by: > java.lang.NullPointerException*12:18:49* at > org.apache.beam.runners.core.construction.SchemaTranslation.toProto(SchemaTranslation.java:57)*12:18:49* > at > org.apache.beam.runners.core.construction.SchemaTranslation.toProto(SchemaTranslation.java:85)*12:18:49* > at > org.apache.beam.runners.core.construction.SchemaTranslation.toProto(SchemaTranslation.java:73)*12:18:49* > at > org.apache.beam.runners.core.construction.SchemaTranslation.toProto(SchemaTranslation.java:60)*12:18:49* > at > org.apache.beam.runners.dataflow.util.SchemaCoderCloudObjectTranslator.toCloudObject(SchemaCoderCloudObjectTranslator.java:55)*12:18:49* > at > org.apache.beam.runners.dataflow.util.SchemaCoderCloudObjectTranslator.toCloudObject(SchemaCoderCloudObjectTranslator.java:31)*12:18:49* > at > org.apache.beam.runners.dataflow.util.CloudObjects.asCloudObject(CloudObjects.java:69)*12:18:49* > at > org.apache.beam.runners.dataflow.util.CloudObjectTranslators.addComponents(CloudObjectTranslators.java:59)*12:18:49* > at > org.apache.beam.runners.dataflow.util.CloudObjectTranslators.access$000(CloudObjectTranslators.java:51)*12:18:49* > at > org.apache.beam.runners.dataflow.util.CloudObjectTranslators$6.toCloudObject(CloudObjectTranslators.java:250)*12:18:49* > at > org.apache.beam.runners.dataflow.util.CloudObjectTranslators$6.toCloudObject(CloudObjectTranslators.java:245)*12:18:49* > at > org.apache.beam.runners.dataflow.util.CloudObjects.asCloudObject(CloudObjects.java:69)*12:18:49* > at > org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translateCoder(DataflowPipelineTranslator.java:1151)*12:18:49* >at > org.apache.beam.runners.dataflow.DataflowPipelineTranslator.access$600(DataflowPipelineTranslator.java:119)*12:18:49* > at > org.apache.beam.runners.dataflow.DataflowPipelineTranslator$StepTranslator.addOutput(DataflowPipelineTranslator.java:706)*12:18:49* > at > org.apache.beam.runners.dataflow.DataflowPipelineTranslator$StepTranslator.addOutput(DataflowPipelineTranslator.java:652)*12:18:49* > at > org.apache.beam.runners.dataflow.DataflowPipelineTranslator$8.translateSingleHelper(DataflowPipelineTranslator.java:971)*12:18:49* >at > org.apache.beam.runners.dataflow.DataflowPipelineTranslator$8.translate(DataflowPipelineTranslator.java:952)*12:18:49* >at > org.apache.beam.runners.dataflow.DataflowPipelineTranslator$8.translate(DataflowPipelineTranslator.java:949)*12:18:49* >at > org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.visitPrimitiveTransform(DataflowPipelineTranslator.java:475)*12:18:49* > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:665)*12:18:49* > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)*12:18:49* > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)*12:18:49* > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317)*12:18:49* > at > org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251)*12:18:49* > at > org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)*12:18:49* > at > org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.translate(DataflowPipelineTranslator.java:414)*12:18:49* > at >
[jira] [Commented] (BEAM-7042) beam-sdks-java-core leaks antlr4 dependency
[ https://issues.apache.org/jira/browse/BEAM-7042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16814926#comment-16814926 ] Reuven Lax commented on BEAM-7042: -- All tests appear to pass. However we probably should try and vendor this dependency so we don't have any more weirdness. On Wed, Apr 10, 2019 at 3:52 PM Michael Luckey (JIRA) > beam-sdks-java-core leaks antlr4 dependency > --- > > Key: BEAM-7042 > URL: https://issues.apache.org/jira/browse/BEAM-7042 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Reporter: Romain Manni-Bucau >Assignee: Michael Luckey >Priority: Major > Fix For: 2.12.0 > > Time Spent: 1h 20m > Remaining Estimate: 0h > > beam-sdks-java-core brings antlr4 and its transitive deps which is quite > unlikely and has a huge probability to conflict (antlt, jsonp in an outdated > version at least for the one breaking my apps). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-7042) beam-sdks-java-core leaks antlr4 dependency
[ https://issues.apache.org/jira/browse/BEAM-7042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16814548#comment-16814548 ] Reuven Lax commented on BEAM-7042: -- It was added because some things were breaking without it. I don't quite remember what broke though. On Wed, Apr 10, 2019 at 4:23 AM Michael Luckey (JIRA) > beam-sdks-java-core leaks antlr4 dependency > --- > > Key: BEAM-7042 > URL: https://issues.apache.org/jira/browse/BEAM-7042 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Reporter: Romain Manni-Bucau >Assignee: Michael Luckey >Priority: Major > Fix For: 2.12.0 > > Time Spent: 1h 20m > Remaining Estimate: 0h > > beam-sdks-java-core brings antlr4 and its transitive deps which is quite > unlikely and has a huge probability to conflict (antlt, jsonp in an outdated > version at least for the one breaking my apps). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-7042) [regression] org.apache.beam:beam-sdks-java-core:jar:2.12.0 dependencies are way fatter and conflicting than before
[ https://issues.apache.org/jira/browse/BEAM-7042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16814115#comment-16814115 ] Reuven Lax commented on BEAM-7042: -- I'm trying to figure this out. The last PR that changed the antlr rules went in on Jan 25 - before Beam 2.10 - and even that one didn't modify the antlr dependency. The antlr dependency has been fixed at antlr 4.7 since Jan 15. I'm not entirely sure how you could see this in 2.12 and not in 2.10 or 2.11. The only PR I can find that could maybe have an effect, is that Gradle was upgraded from Gradle 4 to Gradle 5. On Tue, Apr 9, 2019, 9:47 PM Romain Manni-Bucau (JIRA) > [regression] org.apache.beam:beam-sdks-java-core:jar:2.12.0 dependencies are > way fatter and conflicting than before > --- > > Key: BEAM-7042 > URL: https://issues.apache.org/jira/browse/BEAM-7042 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Reporter: Romain Manni-Bucau >Assignee: Reuven Lax >Priority: Major > Fix For: 2.13.0 > > > Hi guys, > seems current sdk brings antlr and its transitive deps which is quite > unlikely and has a huge probability to conflict (antlt, jsonp in an outdated > version at least for the one breaking my apps) > can it be cleaned up for the 2.12 to avoid to break application please? > Thanks, > Romain -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-7042) [regression] org.apache.beam:beam-sdks-java-core:jar:2.12.0 dependencies are way fatter and conflicting than before
[ https://issues.apache.org/jira/browse/BEAM-7042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16813875#comment-16813875 ] Reuven Lax commented on BEAM-7042: -- Given that Antlr has been part of core since 2.10, I don't think this is a release blocker. We should fix this bogus dependency on jsonp, but for now I'll move this bug to the next release. On Tue, Apr 9, 2019 at 3:33 PM Andrew Pilloud (JIRA) > [regression] org.apache.beam:beam-sdks-java-core:jar:2.12.0 dependencies are > way fatter and conflicting than before > --- > > Key: BEAM-7042 > URL: https://issues.apache.org/jira/browse/BEAM-7042 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Reporter: Romain Manni-Bucau >Assignee: Reuven Lax >Priority: Major > Fix For: 2.12.0 > > > Hi guys, > seems current sdk brings antlr and its transitive deps which is quite > unlikely and has a huge probability to conflict (antlt, jsonp in an outdated > version at least for the one breaking my apps) > can it be cleaned up for the 2.12 to avoid to break application please? > Thanks, > Romain -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-7042) [regression] org.apache.beam:beam-sdks-java-core:jar:2.12.0 dependencies are way fatter and conflicting than before
[ https://issues.apache.org/jira/browse/BEAM-7042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16813669#comment-16813669 ] Reuven Lax commented on BEAM-7042: -- question for [~romain.manni-bucau]. The Antlr dependency went in in January, and so was present in both Beam 2.10 and Beam 2.11. Is there a reason .that you are seeing a problem only now with 2.12? Has something changed with this dependency? > [regression] org.apache.beam:beam-sdks-java-core:jar:2.12.0 dependencies are > way fatter and conflicting than before > --- > > Key: BEAM-7042 > URL: https://issues.apache.org/jira/browse/BEAM-7042 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Reporter: Romain Manni-Bucau >Priority: Major > Fix For: 2.12.0 > > > Hi guys, > seems current sdk brings antlr and its transitive deps which is quite > unlikely and has a huge probability to conflict (antlt, jsonp in an outdated > version at least for the one breaking my apps) > can it be cleaned up for the 2.12 to avoid to break application please? > Thanks, > Romain -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-7042) [regression] org.apache.beam:beam-sdks-java-core:jar:2.12.0 dependencies are way fatter and conflicting than before
[ https://issues.apache.org/jira/browse/BEAM-7042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16813662#comment-16813662 ] Reuven Lax commented on BEAM-7042: -- The current design of schemas is for them to be a type system for Beam, which means they are the core of Beam now, not a layer on top. This was the design that was agreed upon, and over the past year+ schemas have been tightly integrated into the core. This can of course be reevaluated on the dev list, but is off topic for this Jira issue. Dependency conflicts can (and sadly) do happen due to any and all of our dependencies, so there's nothing special about Antlr here. Likely the right answer is to vendor Antlr. Is there documentation anywhere about how to go about the vendoring process? > [regression] org.apache.beam:beam-sdks-java-core:jar:2.12.0 dependencies are > way fatter and conflicting than before > --- > > Key: BEAM-7042 > URL: https://issues.apache.org/jira/browse/BEAM-7042 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Reporter: Romain Manni-Bucau >Priority: Major > Fix For: 2.12.0 > > > Hi guys, > seems current sdk brings antlr and its transitive deps which is quite > unlikely and has a huge probability to conflict (antlt, jsonp in an outdated > version at least for the one breaking my apps) > can it be cleaned up for the 2.12 to avoid to break application please? > Thanks, > Romain -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-7042) [regression] org.apache.beam:beam-sdks-java-core:jar:2.12.0 dependencies are way fatter and conflicting than before
[ https://issues.apache.org/jira/browse/BEAM-7042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16813547#comment-16813547 ] Reuven Lax commented on BEAM-7042: -- though I'm a bit confused as to why antlr depends on jsonp at all :P > [regression] org.apache.beam:beam-sdks-java-core:jar:2.12.0 dependencies are > way fatter and conflicting than before > --- > > Key: BEAM-7042 > URL: https://issues.apache.org/jira/browse/BEAM-7042 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Reporter: Romain Manni-Bucau >Priority: Major > Fix For: 2.12.0 > > > Hi guys, > seems current sdk brings antlr and its transitive deps which is quite > unlikely and has a huge probability to conflict (antlt, jsonp in an outdated > version at least for the one breaking my apps) > can it be cleaned up for the 2.12 to avoid to break application please? > Thanks, > Romain -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-7042) [regression] org.apache.beam:beam-sdks-java-core:jar:2.12.0 dependencies are way fatter and conflicting than before
[ https://issues.apache.org/jira/browse/BEAM-7042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16813541#comment-16813541 ] Reuven Lax commented on BEAM-7042: -- Yes, I do believe it needs to be in core. Schema select-statement parsing will be integrated into DoFnRunner (so you can select schema fields in ParDo as discussed in the past on the dev list), and DoFnRunner is in core. Yes, I think this should be probably be vendored. > [regression] org.apache.beam:beam-sdks-java-core:jar:2.12.0 dependencies are > way fatter and conflicting than before > --- > > Key: BEAM-7042 > URL: https://issues.apache.org/jira/browse/BEAM-7042 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Reporter: Romain Manni-Bucau >Priority: Major > Fix For: 2.12.0 > > > Hi guys, > seems current sdk brings antlr and its transitive deps which is quite > unlikely and has a huge probability to conflict (antlt, jsonp in an outdated > version at least for the one breaking my apps) > can it be cleaned up for the 2.12 to avoid to break application please? > Thanks, > Romain -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-6753) Create proto representation for schemas
[ https://issues.apache.org/jira/browse/BEAM-6753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16812109#comment-16812109 ] Reuven Lax commented on BEAM-6753: -- Right now I'm only using this proto in the Dataflow runner. Before using this proto more widely in portability, I think we need to have a broader conversation on the dev list (or on this JIRA) about what the actual proto should look like. I've talked a few people who are interested in plumbing schemas through the Python API. The basic idea is to use the Python typehints framework to detect and enforce schema matching. Once we have that, the basic schema transforms (filter, group, join, select, etc.) need to be implemented in Python. Given the popularity of pandas, it's likely that a dataframe-compatible API would be a good idea in Python (it doesn't have to be the primary API. We could also implement the separate PTransforms, at which point it would be trivial to create a dataframe wrapper around them). > Create proto representation for schemas > --- > > Key: BEAM-6753 > URL: https://issues.apache.org/jira/browse/BEAM-6753 > Project: Beam > Issue Type: Sub-task > Components: beam-model >Reporter: Reuven Lax >Assignee: Reuven Lax >Priority: Major > Time Spent: 2.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-6953) BigQueryIO has constants that should be PipelineOptions
Reuven Lax created BEAM-6953: Summary: BigQueryIO has constants that should be PipelineOptions Key: BEAM-6953 URL: https://issues.apache.org/jira/browse/BEAM-6953 Project: Beam Issue Type: Bug Components: io-java-gcp Reporter: Reuven Lax -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-6756) Support lazy iterables in schemas
[ https://issues.apache.org/jira/browse/BEAM-6756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16796558#comment-16796558 ] Reuven Lax commented on BEAM-6756: -- this would require adding a new ITERABLE schema type (would not support operations such as size()). > Support lazy iterables in schemas > - > > Key: BEAM-6756 > URL: https://issues.apache.org/jira/browse/BEAM-6756 > Project: Beam > Issue Type: Sub-task > Components: sdk-java-core >Reporter: Reuven Lax >Assignee: Reuven Lax >Priority: Major > > The iterables returned by GroupByKey and CoGroupByKey are lazy; this allows a > runner to page data into memory if the full iterable is too large. We > currently don't support this in Schemas, so the Schema Group and CoGroup > transforms materialize all data into memory. We should add support for this. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-6858) Support side inputs injected into a DoFn
Reuven Lax created BEAM-6858: Summary: Support side inputs injected into a DoFn Key: BEAM-6858 URL: https://issues.apache.org/jira/browse/BEAM-6858 Project: Beam Issue Type: Bug Components: sdk-java-core Reporter: Reuven Lax Beam currently supports injecting main inputs into a DoFn process method. A user can write the following: @ProcessElement public void process(@Element InputT element) And Beam will (using ByteBuddy code generation) inject the input element into the process method. We would like to also support the same for side inputs. For example: @ProcessElement public void process(@Element InputT element, @SideInput("tag1") String input1, @SideInput("tag2") Integer input2) This requires the existing process-method analysis framework to capture these side inputs. The ParDo code would have to verify the type of the side input and include them in the list of side inputs. This would also eliminate the need for the user to explicitly call withSideInputs on the ParDo. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-6857) Support dynamic timers
Reuven Lax created BEAM-6857: Summary: Support dynamic timers Key: BEAM-6857 URL: https://issues.apache.org/jira/browse/BEAM-6857 Project: Beam Issue Type: Bug Components: sdk-java-core Reporter: Reuven Lax The Beam timers API currently requires each timer to be statically specified in the DoFn. The user must provide a separate callback method per timer. For example: DoFn() { @TimerId("timer1") private final TimerSpec timer1 = TimerSpecs.timer(...); @TimerId("timer2") private final TimerSpec timer2 = TimerSpecs.timer(...); .. set timers in processElement @OnTimer("timer1") public void onTimer1() \{ .} @OnTimer("timer2") public void onTimer2() \{} } However there are many cases where the user does not know the set of timers statically when writing their code. This happens when the timer tag should be based on the data. It also happens when writing a DSL on top of Beam, where the DSL author has to create DoFns but does not know statically which timers their users will want to set (e.g. Scio). The goal is to support dynamic timers. Something as follows; DoFn() { @TimerId("timer") private final TimerSpec timer1 = TimerSpecs.dynamicTimer(...); @ProcessElement process(@TimerId("timer") DynamicTimer timer) { timer.set("tag1'", ts); timer.set("tag2", ts); } @OnTimer("timer") public void onTimer1(@TimerTag String tag) \{ .} } -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-6856) Support dynamic MapState on Dataflow
[ https://issues.apache.org/jira/browse/BEAM-6856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16795509#comment-16795509 ] Reuven Lax commented on BEAM-6856: -- Beam's state API supports MapState, a way of storing a tag-value map in the state for a given key. Currently the entire map for a given key must be stored in memory (unlike BagState which is paginated from our service). The goal is to support out-of-memory MapState that supports fetching individual keys or paginated iteration over the map. Note: this depends on the Dataflow service delivering two new pieces of functionality, which will be a prerequisite for the Beam API work. > Support dynamic MapState on Dataflow > > > Key: BEAM-6856 > URL: https://issues.apache.org/jira/browse/BEAM-6856 > Project: Beam > Issue Type: Bug > Components: runner-dataflow >Reporter: Reuven Lax >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-6856) Support dynamic MapState on Dataflow
Reuven Lax created BEAM-6856: Summary: Support dynamic MapState on Dataflow Key: BEAM-6856 URL: https://issues.apache.org/jira/browse/BEAM-6856 Project: Beam Issue Type: Bug Components: runner-dataflow Reporter: Reuven Lax -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-6855) Side inputs are not supported when using the state API
Reuven Lax created BEAM-6855: Summary: Side inputs are not supported when using the state API Key: BEAM-6855 URL: https://issues.apache.org/jira/browse/BEAM-6855 Project: Beam Issue Type: Bug Components: sdk-java-core Reporter: Reuven Lax -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-5645) RowCoder not equal after serialization
[ https://issues.apache.org/jira/browse/BEAM-5645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16788257#comment-16788257 ] Reuven Lax commented on BEAM-5645: -- Do we simply need to add an equals() override? On Fri, Mar 8, 2019 at 3:13 AM Julien Tournay (JIRA) > RowCoder not equal after serialization > -- > > Key: BEAM-5645 > URL: https://issues.apache.org/jira/browse/BEAM-5645 > Project: Beam > Issue Type: Bug > Components: runner-core >Affects Versions: 2.6.0 >Reporter: Julien Tournay >Priority: Major > > The following code throws an exception: > {code:scala} > import org.apache.beam.sdk.coders._ > import org.apache.beam.sdk.schemas.Schema > val schema = > Schema > .builder() > .addInt32Field("c1") > .addStringField("c2") > .addDoubleField("c3") > .build() > val coder = RowCoder.of(schema) > org.apache.beam.sdk.util.SerializableUtils.ensureSerializable(coder) > {code} > {code} > java.lang.IllegalStateException: Coder not equal to original after > serialization, indicating that the Coder may not implement serialization > correctly. Before: org.apache.beam.sdk.coders.RowCoder@1323050e, after: > org.apache.beam.sdk.coders.RowCoder@4c03150a > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-6772) Select transform has non-intuitive semantics
Reuven Lax created BEAM-6772: Summary: Select transform has non-intuitive semantics Key: BEAM-6772 URL: https://issues.apache.org/jira/browse/BEAM-6772 Project: Beam Issue Type: Sub-task Components: sdk-java-core Reporter: Reuven Lax Assignee: Reuven Lax Consider the following schema: User: name: STRING location: Location Location: latitude: DOUBLE longitude: DOUBLE If you apply Select.fieldNames("location"), most users expect to get back a row matching the Location schema. Instead you get back an outer schema with a single location field in it. Select should instead unnest the output up to the point where multiple fields are selected. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-6756) Support lazy iterables in schemas
Reuven Lax created BEAM-6756: Summary: Support lazy iterables in schemas Key: BEAM-6756 URL: https://issues.apache.org/jira/browse/BEAM-6756 Project: Beam Issue Type: Sub-task Components: sdk-java-core Reporter: Reuven Lax Assignee: Reuven Lax The iterables returned by GroupByKey and CoGroupByKey are lazy; this allows a runner to page data into memory if the full iterable is too large. We currently don't support this in Schemas, so the Schema Group and CoGroup transforms materialize all data into memory. We should add support for this. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-6753) Create proto representation for schemas
Reuven Lax created BEAM-6753: Summary: Create proto representation for schemas Key: BEAM-6753 URL: https://issues.apache.org/jira/browse/BEAM-6753 Project: Beam Issue Type: Sub-task Components: beam-model Reporter: Reuven Lax Assignee: Reuven Lax -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-6701) Create LogicalType for Schema fields
Reuven Lax created BEAM-6701: Summary: Create LogicalType for Schema fields Key: BEAM-6701 URL: https://issues.apache.org/jira/browse/BEAM-6701 Project: Beam Issue Type: Sub-task Components: sdk-java-core Reporter: Reuven Lax Assignee: Reuven Lax This will allow users to create their own logical types to store in schema fields, backed by one of the fundamental schema field types. Today SQL hacks on top of the field metadata to distinguish its types. LogicalTypes would allow for a more principled way of doing this. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-6700) isSubType isSuperType methods do not belong in Schema.FieldType
Reuven Lax created BEAM-6700: Summary: isSubType isSuperType methods do not belong in Schema.FieldType Key: BEAM-6700 URL: https://issues.apache.org/jira/browse/BEAM-6700 Project: Beam Issue Type: Sub-task Components: sdk-java-core Reporter: Reuven Lax Assignee: Reuven Lax These control logic specific to the Cast transform. These functions belong in the Cast transform. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-6675) The JdbcIO sink should accept schemas
Reuven Lax created BEAM-6675: Summary: The JdbcIO sink should accept schemas Key: BEAM-6675 URL: https://issues.apache.org/jira/browse/BEAM-6675 Project: Beam Issue Type: Sub-task Components: io-java-jdbc Reporter: Reuven Lax If the input has a schema, there should be a default mapping to a PreparedStatement for writing based on that schema. -- This message was sent by Atlassian JIRA (v7.6.3#76005)