I used HCatalogIO:
Map<String, String> configProperties = new HashMap<>();
configProperties.put("hive.metastore.uris", "....");
pipeline.apply(HCatalogIO.read()
.withConfigProperties(configProperties)
.withDatabase(DB_NAME)
.withTable(TEST_TABLE))
.apply("to-string", MapElements.via(new SimpleFunction<HCatRecord,
String>() {
@Override
public String apply(HCatRecord input) {
return input.toString();
}
}))
.apply(TextIO.write().to("my-logs/output.txt").withoutSharding());
But I am getting this error:
20/02/11 07:49:24 INFO spark.SparkContext: Starting job: collect at
BoundedDataset.java:93
Exception in thread "dag-scheduler-event-loop" java.lang.NoSuchMethodError:
org.apache.hive.hcatalog.common.HCatUtil.getHiveMetastoreClient(Lorg/apache/hadoop/hive/conf/HiveConf;)Lorg/apache/hadoop/hive/metastore/IMetaStoreClient;
at
org.apache.beam.sdk.io.hcatalog.HCatalogUtils.createMetaStoreClient(HCatalogUtils.java:42)
at
org.apache.beam.sdk.io.hcatalog.HCatalogIO$BoundedHCatalogSource.getEstimatedSizeBytes(HCatalogIO.java:323)
at
org.apache.beam.runners.spark.io.SourceRDD$Bounded.getPartitions(SourceRDD.java:104)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:94)
at
org.apache.spark.rdd.ShuffledRDD.getDependencies(ShuffledRDD.scala:87)
at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:240)
at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:238)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.dependencies(RDD.scala:238)
at
org.apache.spark.scheduler.DAGScheduler.getShuffleDependencies(DAGScheduler.scala:512)
at
org.apache.spark.scheduler.DAGScheduler.getOrCreateParentStages(DAGScheduler.scala:461)
at
org.apache.spark.scheduler.DAGScheduler.createResultStage(DAGScheduler.scala:448)
at
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:962)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2067)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
20/02/11 07:52:54 INFO util.JsonImpl: Shutting down HTTP client
From: Gershi, Noam [ICG-IT]
Sent: Tuesday, February 11, 2020 11:08 AM
To: [email protected]
Subject: RE: Apache Beam with Hive
Thanx.
Looks like Hcatalog could work.
But - is the an example with ‘SELECT’ query?
JdbcIO probably not good to me, since my spark cluster in already configured to
work with Hive. So – When I am code Spark pipelines, I can write queries
without the need to give the user/password. – I would like to have something
similar in Apache Beam.
From: [gmail.com] [email protected]<mailto:[email protected]>
<[email protected]<mailto:[email protected]>>
Sent: Tuesday, February 11, 2020 10:26 AM
To: [email protected]<mailto:[email protected]>
Subject: Re: Apache Beam with Hive
While I've not created Hive tables using this .. reading and writing worked
very well using HCatalogIO.
https://beam.apache.org/releases/javadoc/2.18.0/org/apache/beam/sdk/io/hcatalog/HCatalogIO.html<https://urldefense.proofpoint.com/v2/url?u=https-3A__beam.apache.org_releases_javadoc_2.18.0_org_apache_beam_sdk_io_hcatalog_HCatalogIO.html&d=DwMFaQ&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=weSBkYYkcVBTPEmCbP7YQPAjEPSbBXyqPYqK6vLTOAM&m=H-EFU9Q7yATgV6oX55NU_3j5EMt2X2k-9B3sJSMou9M&s=NsHbYvkYO2ZYHO43DxCjEdgW_eCBBZJZCMsdq_lZK4o&e=>
On Tue, Feb 11, 2020 at 11:38 AM Jean-Baptiste Onofre
<[email protected]<mailto:[email protected]>> wrote:
Hi,
If you are ok to use Hive via JDBC, you can use JdbcIO and see the example in
the javadoc:
https://github.com/apache/beam/blob/master/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L82<https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_beam_blob_master_sdks_java_io_jdbc_src_main_java_org_apache_beam_sdk_io_jdbc_JdbcIO.java-23L82&d=DwMFaQ&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=weSBkYYkcVBTPEmCbP7YQPAjEPSbBXyqPYqK6vLTOAM&m=H-EFU9Q7yATgV6oX55NU_3j5EMt2X2k-9B3sJSMou9M&s=OMH2F23vKbWVyjdXOmsA2hg78Fmp-oxskiaqrnprcII&e=>
Regards
JB
Le 11 févr. 2020 à 07:03, Gershi, Noam
<[email protected]<mailto:[email protected]>> a écrit :
Hi,
I am searching for a detailed example how to use Apache Beam with Hive and/or
Hcatalog?
Creating tables, inserting data and fetching it…
Noam Gershi Software Developer
ICG Technology – TLV Lab
T: +972 (3) 7405718
<image002.png>