RenameFields behaves differently in DirectRunner

2021-05-31 Thread Matthew Ouyang
I’m trying to use the RenameFields transform prior to inserting into
BigQuery on nested fields.  Insertion into BigQuery is successful with
DirectRunner, but DataflowRunner has an issue with renamed nested fields
 The error message I’m receiving, : Error while reading data, error
message: JSON parsing error in row starting at position 0: No such field:
nestedField.field1_0, suggests the BigQuery is trying to use the original
name for the nested field and not the substitute name.

The code for RenameFields seems simple enough but does it behave
differently in different runners?  Will a deep attachValues be necessary in
order get the nested renames to work across all runners? Is there something
wrong in my code?

https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/RenameFields.java#L186

The unit tests also seem to be disabled for this as well and so I don’t
know if the PTransform behaves as expected.

https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/build.gradle#L67

https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/RenameFieldsTest.java

package ca.loblaw.cerebro.PipelineControl;
>
> import com.google.api.services.bigquery.model.TableReference;
> import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
> import org.apache.beam.sdk.Pipeline;
> import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
> import org.apache.beam.sdk.options.PipelineOptionsFactory;
> import org.apache.beam.sdk.schemas.Schema;
> import org.apache.beam.sdk.schemas.transforms.RenameFields;
> import org.apache.beam.sdk.transforms.Create;
> import org.apache.beam.sdk.values.Row;
>
> import java.io.File;
> import java.util.Arrays;
> import java.util.HashSet;
> import java.util.stream.Collectors;
>
> import static java.util.Arrays.*asList*;
>
> public class BQRenameFields {
> public static void main(String[] args) {
> PipelineOptionsFactory.*register*(DataflowPipelineOptions.class);
> DataflowPipelineOptions options = PipelineOptionsFactory.
> *fromArgs*(args).as(DataflowPipelineOptions.class);
> options.setFilesToStage(
> Arrays.*stream*(System.*getProperty*("java.class.path").
> split(File.*pathSeparator*)).
> map(entry -> (new File(entry)).toString()).
> collect(Collectors.*toList*()));
>
> Pipeline pipeline = Pipeline.*create*(options);
>
> Schema nestedSchema = Schema.*builder*().addField(Schema.Field.
> *nullable*("field1_0", Schema.FieldType.*STRING*)).build();
> Schema.Field field = Schema.Field.*nullable*("field0_0", Schema.
> FieldType.*STRING*);
> Schema.Field nested = Schema.Field.*nullable*("field0_1", Schema.
> FieldType.*row*(nestedSchema));
> Schema.Field runner = Schema.Field.*nullable*("field0_2", Schema.
> FieldType.*STRING*);
> Schema rowSchema = Schema.*builder*()
> .addFields(field, nested, runner)
> .build();
> Row testRow = Row.*withSchema*(rowSchema).attachValues("value0_0"
> , Row.*withSchema*(nestedSchema).attachValues("value1_0"), options
> .getRunner().toString());
> pipeline
> .apply(Create.*of*(testRow).withRowSchema(rowSchema))
> .apply(RenameFields.*create*()
> .rename("field0_0", "stringField")
> .rename("field0_1", "nestedField")
> .rename("field0_1.field1_0", "nestedStringField")
> .rename("field0_2", "runner"))
> .apply(BigQueryIO.*write*()
> .to(new TableReference().setProjectId(
> "lt-dia-lake-exp-raw").setDatasetId("prototypes").setTableId("matto_
> renameFields"))
> .withCreateDisposition(BigQueryIO.Write.
> CreateDisposition.*CREATE_IF_NEEDED*)
> .withWriteDisposition(BigQueryIO.Write.
> WriteDisposition.*WRITE_APPEND*)
> .withSchemaUpdateOptions(new HashSet<>(*asList*(
> BigQueryIO.Write.SchemaUpdateOption.*ALLOW_FIELD_ADDITION*)))
> .useBeamSchema());
> pipeline.run();
> }
> }
>


Re: [DISCUSS] Drop support for Flink 1.10

2021-05-31 Thread David Morávek
Hi,

+1 as we've agreed to keep support for three latest major releases in the
past

D.

On Mon, May 31, 2021 at 9:54 AM Jan Lukavský  wrote:

> Hi,
>
> +1 to remove the support for 1.10.
>
>  Jan
> On 5/28/21 10:00 PM, Ismaël Mejía wrote:
>
> Hello,
>
> With Beam support for Flink 1.13 just merged it is the time to discuss the
> end of
> support for Flink 1.10 following the agreed policy on supporting only the
> latest
> three Flink releases [1].
>
> I would like to propose that for Beam 2.31.0 we stop supporting Flink 1.10
> [2].
> I prepared a PR for this [3] but of course I wanted to bring the subject
> here
> (and to user@) for your attention and in case someone has a different
> opinion or
> reason to still support the older version.
>
> WDYT?
>
> Regards,
> Ismael
>
> [1]
> https://lists.apache.org/thread.html/rfb5ac9d889d0e3f4400471de3c25000a15352bde879622c899d97581%40%3Cdev.beam.apache.org%3E
> [2] https://issues.apache.org/jira/browse/BEAM-12281
> [3] https://github.com/apache/beam/pull/14906
>
>


Re: No filesystem found for scheme hdfs

2021-05-31 Thread Ismaël Mejía
You probably need to include the beam-sdks-java-io-hadoop-file-system
module.


On Mon, May 31, 2021 at 11:41 AM Gershi, Noam  wrote:

> Hi
>
>
>
> I am using Spark-runner, and when I am using Apache Beam TextIO to read a
> file from HDFS:
>
>
>
> .apply(TextIO.read().from(“hdfs://path-to-file”)
>
>
>
> I get:
>
>
>
> Caused by: java.lang.IllegalArgumentException: No filesystem found for
> scheme hdfs
>
>  at
> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:463)
>
>  at org.apache.beam.sdk.io.FileSystems.match(FileSystems.java:120)
>
>  at org.apache.beam.sdk.io.FileSystems.match(FileSystems.java:141)
>
>  at org.apache.beam.sdk.io.FileSystems.match(FileSystems.java:153)
>
>  at
> org.apache.beam.sdk.io.FileBasedSource.split(FileBasedSource.java:262)
>
>  at
> org.apache.beam.runners.spark.io.SourceRDD$Bounded.getPartitions(SourceRDD.java:115)
>
>
>
>
>
> What is missing to read files from hdfs ?
>
>
>
>
>
> Noam Gershi
>
> ICG Technology – TLV Lab
>
>
>
>[image:
> http://www.citigroup.com/emeaemailresources/gra30973_EmailSignature.jpg]
>
>
>


No filesystem found for scheme hdfs

2021-05-31 Thread Gershi, Noam
Hi

I am using Spark-runner, and when I am using Apache Beam TextIO to read a file 
from HDFS:

.apply(TextIO.read().from("hdfs://path-to-file")

I get:

Caused by: java.lang.IllegalArgumentException: No filesystem found for scheme 
hdfs
 at 
org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:463)
 at org.apache.beam.sdk.io.FileSystems.match(FileSystems.java:120)
 at org.apache.beam.sdk.io.FileSystems.match(FileSystems.java:141)
 at org.apache.beam.sdk.io.FileSystems.match(FileSystems.java:153)
 at org.apache.beam.sdk.io.FileBasedSource.split(FileBasedSource.java:262)
 at 
org.apache.beam.runners.spark.io.SourceRDD$Bounded.getPartitions(SourceRDD.java:115)


What is missing to read files from hdfs ?


Noam Gershi
ICG Technology - TLV Lab

   [http://www.citigroup.com/emeaemailresources/gra30973_EmailSignature.jpg]



Re: [DISCUSS] Drop support for Flink 1.10

2021-05-31 Thread Jan Lukavský

Hi,

+1 to remove the support for 1.10.

 Jan

On 5/28/21 10:00 PM, Ismaël Mejía wrote:

Hello,

With Beam support for Flink 1.13 just merged it is the time to discuss 
the end of
support for Flink 1.10 following the agreed policy on supporting only 
the latest

three Flink releases [1].

I would like to propose that for Beam 2.31.0 we stop supporting Flink 
1.10 [2].
I prepared a PR for this [3] but of course I wanted to bring the 
subject here
(and to user@) for your attention and in case someone has a different 
opinion or

reason to still support the older version.

WDYT?

Regards,
Ismael

[1] 
https://lists.apache.org/thread.html/rfb5ac9d889d0e3f4400471de3c25000a15352bde879622c899d97581%40%3Cdev.beam.apache.org%3E 

[2] https://issues.apache.org/jira/browse/BEAM-12281 

[3] https://github.com/apache/beam/pull/14906