Hi Wldd,
Hive 是可以访问的,我把之前的2个gist内容贴在这个邮件里,你看一下,里面有hive查询数据的返回结果。
还需要什么信息我再提供。
======== flink insert into hive error ========
org.apache.flink.table.api.TableException: Exception in close
at
org.apache.flink.table.filesystem.FileSystemOutputFormat.close(FileSystemOutputFormat.java:131)
at
org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.close(OutputFormatSinkFunction.java:97)
at
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:109)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:635)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$afterInvoke$1(StreamTask.java:515)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:513)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:478)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.base/java.lang.Thread.run(Thread.java:830)
Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException):
File
/user/hive/warehouse/pokes/.staging_1590483224500/cp-0/task-0/cp-0-task-0-file-0
could only be replicated to 0 nodes instead of minReplication (=1).
There are 1 datanode(s) running and 1 node(s) are excluded in this
operation.
at
org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1628)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getNewBlockTargets(FSNamesystem.java:3121)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3045)
at
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:725)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:493)
at
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2217)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2213)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1746)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2213)
at org.apache.hadoop.ipc.Client.call(Client.java:1476)
at org.apache.hadoop.ipc.Client.call(Client.java:1413)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
at com.sun.proxy.$Proxy22.addBlock(Unknown Source)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:418)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:567)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy23.addBlock(Unknown Source)
at
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1588)
at
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1373)
at
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:554)
======== Flink 1.10.0 的lib目录 ========
mysql-connector-java-5.1.48.jar
slf4j-log4j12-1.7.15.jar
log4j-1.2.17.jar
flink-table_2.11-1.10.0.jar
flink-table-blink_2.11-1.10.0.jar
flink-dist_2.11-1.10.0.jar
flink-jdbc_2.11-1.10.0.jar
flink-sql-connector-elasticsearch6_2.11-1.10.0.jar
flink-sql-connector-kafka_2.11-1.10.0.jar
flink-json-1.10.0.jar
flink-connector-hive_2.11-1.10.0.jar
flink-shaded-hadoop-2-uber-2.7.5-10.0.jar
hive-exec-2.3.7.jar
flink-csv-1.10.1.jar
======== Hive table "pokes" ========
❯ docker-compose exec hive-server bash
root@53082ed70ecd:/opt# /opt/hive/bin/beeline -u jdbc:hive2://localhost:10000
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/opt/hive/lib/log4j-slf4j-impl-2.6.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/opt/hadoop-2.7.4/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Connecting to jdbc:hive2://localhost:10000
Connected to: Apache Hive (version 2.3.2)
Driver: Hive JDBC (version 2.3.2)
Transaction isolation: TRANSACTION_REPEATABLE_READ
Beeline version 2.3.2 by Apache Hive
0: jdbc:hive2://localhost:10000> describe formatted pokes;
+-------------------------------+----------------------------------------------------+-----------------------+
| col_name | data_type
| comment |
+-------------------------------+----------------------------------------------------+-----------------------+
| # col_name | data_type
| comment |
| | NULL
| NULL |
| foo | int
| |
| bar | string
| |
| | NULL
| NULL |
| # Detailed Table Information | NULL
| NULL |
| Database: | default
| NULL |
| Owner: | root
| NULL |
| CreateTime: | Tue May 26 05:42:30 UTC 2020
| NULL |
| LastAccessTime: | UNKNOWN
| NULL |
| Retention: | 0
| NULL |
| Location: |
hdfs://namenode:8020/user/hive/warehouse/pokes | NULL
|
| Table Type: | MANAGED_TABLE
| NULL |
| Table Parameters: | NULL
| NULL |
| | numFiles
| 4 |
| | numRows
| 0 |
| | rawDataSize
| 0 |
| | totalSize
| 5839 |
| | transient_lastDdlTime
| 1590480090 |
| | NULL
| NULL |
| # Storage Information | NULL
| NULL |
| SerDe Library: |
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe | NULL
|
| InputFormat: |
org.apache.hadoop.mapred.TextInputFormat | NULL
|
| OutputFormat: |
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat | NULL
|
| Compressed: | No
| NULL |
| Num Buckets: | -1
| NULL |
| Bucket Columns: | []
| NULL |
| Sort Columns: | []
| NULL |
| Storage Desc Params: | NULL
| NULL |
| | serialization.format
| 1 |
+-------------------------------+----------------------------------------------------+-----------------------+
30 rows selected (0.328 seconds)
0: jdbc:hive2://localhost:10000>
0: jdbc:hive2://localhost:10000> select * from pokes limit 10;
+------------+------------+
| pokes.foo | pokes.bar |
+------------+------------+
| 25 | Tommy |
| 26 | Tommy |
| 27 | Tommy |
| 238 | val_238 |
| 86 | val_86 |
| 311 | val_311 |
| 27 | val_27 |
| 165 | val_165 |
| 409 | val_409 |
| 255 | val_255 |
+------------+------------+
10 rows selected (0.622 seconds)
0: jdbc:hive2://localhost:10000>
======== Hive table "pokes" in Flink ========
Flink SQL> describe pokes;
root
|-- foo: INT
|-- bar: STRING
======== hadoop/hive 环境 ========
version: "3"
services:
namenode:
image: bde2020/hadoop-namenode:2.0.0-hadoop2.7.4-java8
volumes:
- namenode:/hadoop/dfs/name
environment:
- CLUSTER_NAME=test
env_file:
- ./hadoop-hive.env
ports:
- "50070:50070"
- "8020:8020"
datanode:
image: bde2020/hadoop-datanode:2.0.0-hadoop2.7.4-java8
volumes:
- datanode:/hadoop/dfs/data
env_file:
- ./hadoop-hive.env
environment:
SERVICE_PRECONDITION: "namenode:50070"
ports:
- "50075:50075"
hive-server:
image: bde2020/hive:2.3.2-postgresql-metastore
env_file:
- ./hadoop-hive.env
environment:
HIVE_CORE_CONF_javax_jdo_option_ConnectionURL:
"jdbc:postgresql://hive-metastore/metastore"
SERVICE_PRECONDITION: "hive-metastore:9083"
ports:
- "10000:10000"
hive-metastore:
image: bde2020/hive:2.3.2-postgresql-metastore
env_file:
- ./hadoop-hive.env
command: /opt/hive/bin/hive --service metastore
environment:
SERVICE_PRECONDITION: "namenode:50070 datanode:50075
hive-metastore-postgresql:5432"
ports:
- "9083:9083"
hive-metastore-postgresql:
image: bde2020/hive-metastore-postgresql:2.3.0
ports:
- "5432:5432"
presto-coordinator:
image: shawnzhu/prestodb:0.181
ports:
- "8080:8080"
volumes:
namenode:
datanode:
======== hive-site.xml ========
<configuration>
<property>
<name>hive.metastore.uris</name>
<value>thrift://localhost:9083</value>
</property>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:postgresql://localhost/metastore?createDatabaseIfNotExist=true</value>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>org.postgresql.Driver</value>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>hive</value>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>hive</value>
</property>
<property>
<name>hive.metastore.schema.verification</name>
<value>true</value>
</property>
</configuration>
======== sql-client-defaults.yaml ========
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
# This file defines the default environment for Flink's SQL Client.
# Defaults might be overwritten by a session specific environment.
# See the Table API & SQL documentation for details about supported properties.
#==============================================================================
# Tables
#==============================================================================
# Define tables here such as sources, sinks, views, or temporal tables.
tables: [] # empty list
# A typical table source definition looks like:
# - name: ...
# type: source-table
# connector: ...
# format: ...
# schema: ...
# A typical view definition looks like:
# - name: ...
# type: view
# query: "SELECT ..."
# A typical temporal table definition looks like:
# - name: ...
# type: temporal-table
# history-table: ...
# time-attribute: ...
# primary-key: ...
#==============================================================================
# User-defined functions
#==============================================================================
# Define scalar, aggregate, or table functions here.
functions: [] # empty list
# A typical function definition looks like:
# - name: ...
# from: class
# class: ...
# constructor: ...
#==============================================================================
# Catalogs
#==============================================================================
# Define catalogs here.
catalogs:
- name: myhive
type: hive
hive-conf-dir: /Users/enzow/code/flink-sql-demo/flink-1.10.0/conf
hive-version: 2.3.2
#==============================================================================
# Modules
#==============================================================================
# Define modules here.
#modules: # note the following modules will be of the order they are specified
# - name: core
# type: core
#==============================================================================
# Execution properties
#==============================================================================
# Properties that change the fundamental execution behavior of a table program.
execution:
# select the implementation responsible for planning table programs
# possible values are 'blink' (used by default) or 'old'
planner: blink
# 'batch' or 'streaming' execution
type: streaming
# allow 'event-time' or only 'processing-time' in sources
time-characteristic: event-time
# interval in ms for emitting periodic watermarks
periodic-watermarks-interval: 200
# 'changelog' or 'table' presentation of results
result-mode: table
# maximum number of maintained rows in 'table' presentation of results
max-table-result-rows: 1000000
# parallelism of the program
parallelism: 1
# maximum parallelism
max-parallelism: 128
# minimum idle state retention in ms
min-idle-state-retention: 0
# maximum idle state retention in ms
max-idle-state-retention: 0
# current catalog ('default_catalog' by default)
current-catalog: default_catalog
# current database of the current catalog (default database of the
catalog by default)
current-database: default_database
# controls how table programs are restarted in case of a failures
restart-strategy:
# strategy type
# possible values are "fixed-delay", "failure-rate", "none", or
"fallback" (default)
type: fallback
#==============================================================================
# Configuration options
#==============================================================================
# Configuration options for adjusting and tuning table programs.
# A full list of options and their default values can be found
# on the dedicated "Configuration" web page.
# A configuration can look like:
# configuration:
# table.exec.spill-compression.enabled: true
# table.exec.spill-compression.block-size: 128kb
# table.optimizer.join-reorder-enabled: true
#==============================================================================
# Deployment properties
#==============================================================================
# Properties that describe the cluster to which table programs are submitted to.
deployment:
# general cluster communication timeout in ms
response-timeout: 5000
# (optional) address from cluster to gateway
gateway-address: ""
# (optional) port from cluster to gateway
gateway-port: 0
Cheers,
Enzo
On Tue, 26 May 2020 at 17:15, wldd <[email protected]> wrote:
> Hi,Enzo wang
> 图片无法加载,github地址也无法访问,你可以试一下hive可以正常读写表么
>
>
>
>
>
>
>
>
>
>
>
>
>
> --
>
> Best,
> wldd
>
>
>
>
> 在 2020-05-26 17:01:32,"Enzo wang" <[email protected]> 写道:
>
> Hi Wldd,
>
>
> 谢谢回复。
>
>
> 1. datanode 是可用的
>
>
> ❯ docker-compose exec namenode hadoop fs -ls /tmp
> Found 1 items
> drwx-wx-wx - root supergroup 0 2020-05-26 05:40 /tmp/hive
>
>
> namenode 的webui 也可以看到:
>
>
>
>
> 2. 设置set execution.type=batch; 以后,执行报错,错误如下
> Causedby: org.apache.hadoop.ipc.RemoteException(java.io.IOException):
> File
> /user/hive/warehouse/pokes/.staging_1590483224500/cp-0/task-0/cp-0-task-0-file-0
> could only be replicated to 0 nodes instead of minReplication (=1). There
> are 1 datanode(s) running and1 node(s) are excluded inthis operation.
>
>
> 完整错误见:
> https://gist.github.com/r0c/f95ec650fec0a16055787ac0d63f4673
>
>
>
> On Tue, 26 May 2020 at 16:52, wldd <[email protected]> wrote:
>
> 问题1:
>
> org.apache.hadoop.hdfs.BlockMissingException,可以用hadoop fs
> 命令看看那个datanode能不能访问
>
>
> 问题2:
> 写hive,需要用batch模式,set execution.type=batch;
>
>
>
>
>
>
>
> 在 2020-05-26 16:42:12,"Enzo wang" <[email protected]> 写道:
>
> Hi Flink group,
>
>
> 今天再看Flink与Hive集成的部分遇到了几个问题,麻烦大家帮忙看看。
> 参考的网址:
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/hive/hive_catalog.html
>
>
> 版本、表结构信息见这里: https://gist.github.com/r0c/e244622d66447dfc85a512e75fc2159b
>
>
> 问题1:Flink SQL 读Hive 表pokes 失败
>
>
> Flink SQL> select * from pokes;
> 2020-05-26 16:12:11,439 INFO org.apache.hadoop.mapred.FileInputFormat
> - Total input paths to process : 4
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.hadoop.hdfs.BlockMissingException: Could not obtain block:
> BP-138389591-172.20.0.4-1590471581703:blk_1073741825_1001
> file=/user/hive/warehouse/pokes/kv1.txt
>
>
>
>
>
>
>
> 问题2:Flink SQL 写Hive 表pokes 失败
>
>
> Flink SQL> insert into pokes select 12,'tom';
> [INFO] Submitting SQL update statement to the cluster...
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.TableException: Stream Tables can only be
> emitted by AppendStreamTableSink, RetractStreamTableSink, or
> UpsertStreamTableSink.
>
>
>
>
>
>
>
> Cheers,
> Enzo