Hi Noam, Currently, Beam doesn't support conversion of HCatRecords to Rows (or) in your case creating Beam Schema from Hive table schema, when the Hive table have parameterized types.
We can use HCatFieldSchema[1] to create the Beam Schema from the Hive table Schema. I have created a JIRA ticket to track this issue: https://issues.apache.org/jira/browse/BEAM-9840 [1]: https://github.com/apache/hive/blob/f37c5de6c32b9395d1b34fa3c02ed06d1bfbf6eb/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/schema/HCatFieldSchema.java#L34 PS: I am working on supporting this feature. This feature should be supported in the future releases of Apache Beam. Regards, Rahul On Mon, Apr 27, 2020 at 6:57 PM Gershi, Noam <[email protected]> wrote: > Hi > > Using HCatalogIO as a source - I am trying to read column tables. > > > > Code: > > > > PCollection<HCatRecord> hcatRecords = input > > .apply(HCatalogIO.read() > > .withConfigProperties(configProperties) > > .withDatabase(“db-name”) > > .withTable(“my-table-name”)); > > ... > > HCatalogBeamSchema hcatSchema = > HCatalogBeamSchema.create(ImmutableMap.of("table", "my-table-name")); > > Schema schema = hcatSchema.getTableSchema("db-name", > "my-table-name”).get(); > > List<Schema.Field> fields = schema.getFields(); > > > > > > I get: > > > > 20/04/27 09:12:16 INFO LineBufferedStream: Caused by: > java.lang.UnsupportedOperationException: The type 'decimal(30,16)' of field > 'amount' is not supported. > > 20/04/27 09:12:16 INFO LineBufferedStream: at > org.apache.beam.sdk.io.hcatalog.SchemaUtils.toBeamField(SchemaUtils.java:60) > > 20/04/27 09:12:16 INFO LineBufferedStream: at > java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) > > 20/04/27 09:12:16 INFO LineBufferedStream: at > java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) > > 20/04/27 09:12:16 INFO LineBufferedStream: at > java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) > > 20/04/27 09:12:16 INFO LineBufferedStream: at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) > > 20/04/27 09:12:16 INFO LineBufferedStream: at > java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) > > 20/04/27 09:12:16 INFO LineBufferedStream: at > java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > > 20/04/27 09:12:16 INFO LineBufferedStream: at > java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) > > 20/04/27 09:12:16 INFO LineBufferedStream: at > org.apache.beam.sdk.io.hcatalog.SchemaUtils.toBeamSchema(SchemaUtils.java:53) > > 20/04/27 09:12:16 INFO LineBufferedStream: at > org.apache.beam.sdk.io.hcatalog.HCatalogBeamSchema.getTableSchema(HCatalogBeamSchema.java:83) > > > > Thanx in advance, > > Noam > > >
