Hi Wldd,

Hive 是可以访问的,我把之前的2个gist内容贴在这个邮件里,你看一下,里面有hive查询数据的返回结果。

还需要什么信息我再提供。



========  flink insert into hive error ========

org.apache.flink.table.api.TableException: Exception in close
        at 
org.apache.flink.table.filesystem.FileSystemOutputFormat.close(FileSystemOutputFormat.java:131)
        at 
org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.close(OutputFormatSinkFunction.java:97)
        at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
        at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:109)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:635)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$afterInvoke$1(StreamTask.java:515)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:513)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:478)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
        at java.base/java.lang.Thread.run(Thread.java:830)
Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException):
File 
/user/hive/warehouse/pokes/.staging_1590483224500/cp-0/task-0/cp-0-task-0-file-0
could only be replicated to 0 nodes instead of minReplication (=1).
There are 1 datanode(s) running and 1 node(s) are excluded in this
operation.
        at 
org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1628)
        at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getNewBlockTargets(FSNamesystem.java:3121)
        at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3045)
        at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:725)
        at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:493)
        at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
        at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2217)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2213)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1746)
        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2213)

        at org.apache.hadoop.ipc.Client.call(Client.java:1476)
        at org.apache.hadoop.ipc.Client.call(Client.java:1413)
        at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
        at com.sun.proxy.$Proxy22.addBlock(Unknown Source)
        at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:418)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:567)
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
        at com.sun.proxy.$Proxy23.addBlock(Unknown Source)
        at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1588)
        at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1373)
        at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:554)

======== Flink 1.10.0 的lib目录 ========

mysql-connector-java-5.1.48.jar
slf4j-log4j12-1.7.15.jar
log4j-1.2.17.jar
flink-table_2.11-1.10.0.jar
flink-table-blink_2.11-1.10.0.jar
flink-dist_2.11-1.10.0.jar
flink-jdbc_2.11-1.10.0.jar
flink-sql-connector-elasticsearch6_2.11-1.10.0.jar
flink-sql-connector-kafka_2.11-1.10.0.jar
flink-json-1.10.0.jar
flink-connector-hive_2.11-1.10.0.jar
flink-shaded-hadoop-2-uber-2.7.5-10.0.jar
hive-exec-2.3.7.jar
flink-csv-1.10.1.jar


======== Hive table "pokes" ========

❯ docker-compose exec hive-server bash
root@53082ed70ecd:/opt# /opt/hive/bin/beeline -u jdbc:hive2://localhost:10000
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/opt/hive/lib/log4j-slf4j-impl-2.6.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/opt/hadoop-2.7.4/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Connecting to jdbc:hive2://localhost:10000
Connected to: Apache Hive (version 2.3.2)
Driver: Hive JDBC (version 2.3.2)
Transaction isolation: TRANSACTION_REPEATABLE_READ
Beeline version 2.3.2 by Apache Hive
0: jdbc:hive2://localhost:10000> describe formatted pokes;
+-------------------------------+----------------------------------------------------+-----------------------+
|           col_name            |                     data_type
              |        comment        |
+-------------------------------+----------------------------------------------------+-----------------------+
| # col_name                    | data_type
              | comment               |
|                               | NULL
              | NULL                  |
| foo                           | int
              |                       |
| bar                           | string
              |                       |
|                               | NULL
              | NULL                  |
| # Detailed Table Information  | NULL
              | NULL                  |
| Database:                     | default
              | NULL                  |
| Owner:                        | root
              | NULL                  |
| CreateTime:                   | Tue May 26 05:42:30 UTC 2020
              | NULL                  |
| LastAccessTime:               | UNKNOWN
              | NULL                  |
| Retention:                    | 0
              | NULL                  |
| Location:                     |
hdfs://namenode:8020/user/hive/warehouse/pokes     | NULL
    |
| Table Type:                   | MANAGED_TABLE
              | NULL                  |
| Table Parameters:             | NULL
              | NULL                  |
|                               | numFiles
              | 4                     |
|                               | numRows
              | 0                     |
|                               | rawDataSize
              | 0                     |
|                               | totalSize
              | 5839                  |
|                               | transient_lastDdlTime
              | 1590480090            |
|                               | NULL
              | NULL                  |
| # Storage Information         | NULL
              | NULL                  |
| SerDe Library:                |
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe | NULL
    |
| InputFormat:                  |
org.apache.hadoop.mapred.TextInputFormat           | NULL
    |
| OutputFormat:                 |
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat | NULL
            |
| Compressed:                   | No
              | NULL                  |
| Num Buckets:                  | -1
              | NULL                  |
| Bucket Columns:               | []
              | NULL                  |
| Sort Columns:                 | []
              | NULL                  |
| Storage Desc Params:          | NULL
              | NULL                  |
|                               | serialization.format
              | 1                     |
+-------------------------------+----------------------------------------------------+-----------------------+
30 rows selected (0.328 seconds)
0: jdbc:hive2://localhost:10000>

0: jdbc:hive2://localhost:10000> select * from pokes limit 10;
+------------+------------+
| pokes.foo  | pokes.bar  |
+------------+------------+
| 25         | Tommy      |
| 26         | Tommy      |
| 27         | Tommy      |
| 238        | val_238    |
| 86         | val_86     |
| 311        | val_311    |
| 27         | val_27     |
| 165        | val_165    |
| 409        | val_409    |
| 255        | val_255    |
+------------+------------+
10 rows selected (0.622 seconds)
0: jdbc:hive2://localhost:10000>


======== Hive table "pokes" in Flink ========

Flink SQL> describe pokes;
root
 |-- foo: INT
 |-- bar: STRING


======== hadoop/hive 环境 ========

version: "3"

services:
  namenode:
    image: bde2020/hadoop-namenode:2.0.0-hadoop2.7.4-java8
    volumes:
      - namenode:/hadoop/dfs/name
    environment:
      - CLUSTER_NAME=test
    env_file:
      - ./hadoop-hive.env
    ports:
      - "50070:50070"
      - "8020:8020"
  datanode:
    image: bde2020/hadoop-datanode:2.0.0-hadoop2.7.4-java8
    volumes:
      - datanode:/hadoop/dfs/data
    env_file:
      - ./hadoop-hive.env
    environment:
      SERVICE_PRECONDITION: "namenode:50070"
    ports:
      - "50075:50075"
  hive-server:
    image: bde2020/hive:2.3.2-postgresql-metastore
    env_file:
      - ./hadoop-hive.env
    environment:
      HIVE_CORE_CONF_javax_jdo_option_ConnectionURL:
"jdbc:postgresql://hive-metastore/metastore"
      SERVICE_PRECONDITION: "hive-metastore:9083"
    ports:
      - "10000:10000"
  hive-metastore:
    image: bde2020/hive:2.3.2-postgresql-metastore
    env_file:
      - ./hadoop-hive.env
    command: /opt/hive/bin/hive --service metastore
    environment:
      SERVICE_PRECONDITION: "namenode:50070 datanode:50075
hive-metastore-postgresql:5432"
    ports:
      - "9083:9083"
  hive-metastore-postgresql:
    image: bde2020/hive-metastore-postgresql:2.3.0
    ports:
      - "5432:5432"
  presto-coordinator:
    image: shawnzhu/prestodb:0.181
    ports:
      - "8080:8080"

volumes:
  namenode:
  datanode:


======== hive-site.xml ========

<configuration>
    <property>
        <name>hive.metastore.uris</name>
        <value>thrift://localhost:9083</value>
    </property>
    <property>
        <name>javax.jdo.option.ConnectionURL</name>
        
<value>jdbc:postgresql://localhost/metastore?createDatabaseIfNotExist=true</value>
    </property>
    <property>
        <name>javax.jdo.option.ConnectionDriverName</name>
        <value>org.postgresql.Driver</value>
    </property>
    <property>
        <name>javax.jdo.option.ConnectionPassword</name>
        <value>hive</value>
    </property>
    <property>
        <name>javax.jdo.option.ConnectionUserName</name>
        <value>hive</value>
    </property>
   <property>
       <name>hive.metastore.schema.verification</name>
       <value>true</value>
   </property>
</configuration>


======== sql-client-defaults.yaml ========

################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################


# This file defines the default environment for Flink's SQL Client.
# Defaults might be overwritten by a session specific environment.


# See the Table API & SQL documentation for details about supported properties.


#==============================================================================
# Tables
#==============================================================================

# Define tables here such as sources, sinks, views, or temporal tables.

tables: [] # empty list
# A typical table source definition looks like:
# - name: ...
#   type: source-table
#   connector: ...
#   format: ...
#   schema: ...

# A typical view definition looks like:
# - name: ...
#   type: view
#   query: "SELECT ..."

# A typical temporal table definition looks like:
# - name: ...
#   type: temporal-table
#   history-table: ...
#   time-attribute: ...
#   primary-key: ...


#==============================================================================
# User-defined functions
#==============================================================================

# Define scalar, aggregate, or table functions here.

functions: [] # empty list
# A typical function definition looks like:
# - name: ...
#   from: class
#   class: ...
#   constructor: ...


#==============================================================================
# Catalogs
#==============================================================================

# Define catalogs here.

catalogs:
  - name: myhive
    type: hive
    hive-conf-dir: /Users/enzow/code/flink-sql-demo/flink-1.10.0/conf
    hive-version: 2.3.2


#==============================================================================
# Modules
#==============================================================================

# Define modules here.

#modules: # note the following modules will be of the order they are specified
#  - name: core
#    type: core

#==============================================================================
# Execution properties
#==============================================================================

# Properties that change the fundamental execution behavior of a table program.

execution:
  # select the implementation responsible for planning table programs
  # possible values are 'blink' (used by default) or 'old'
  planner: blink
  # 'batch' or 'streaming' execution
  type: streaming
  # allow 'event-time' or only 'processing-time' in sources
  time-characteristic: event-time
  # interval in ms for emitting periodic watermarks
  periodic-watermarks-interval: 200
  # 'changelog' or 'table' presentation of results
  result-mode: table
  # maximum number of maintained rows in 'table' presentation of results
  max-table-result-rows: 1000000
  # parallelism of the program
  parallelism: 1
  # maximum parallelism
  max-parallelism: 128
  # minimum idle state retention in ms
  min-idle-state-retention: 0
  # maximum idle state retention in ms
  max-idle-state-retention: 0
  # current catalog ('default_catalog' by default)
  current-catalog: default_catalog
  # current database of the current catalog (default database of the
catalog by default)
  current-database: default_database
  # controls how table programs are restarted in case of a failures
  restart-strategy:
    # strategy type
    # possible values are "fixed-delay", "failure-rate", "none", or
"fallback" (default)
    type: fallback

#==============================================================================
# Configuration options
#==============================================================================

# Configuration options for adjusting and tuning table programs.

# A full list of options and their default values can be found
# on the dedicated "Configuration" web page.

# A configuration can look like:
# configuration:
#   table.exec.spill-compression.enabled: true
#   table.exec.spill-compression.block-size: 128kb
#   table.optimizer.join-reorder-enabled: true

#==============================================================================
# Deployment properties
#==============================================================================

# Properties that describe the cluster to which table programs are submitted to.

deployment:
  # general cluster communication timeout in ms
  response-timeout: 5000
  # (optional) address from cluster to gateway
  gateway-address: ""
  # (optional) port from cluster to gateway
  gateway-port: 0





Cheers,
Enzo


On Tue, 26 May 2020 at 17:15, wldd <[email protected]> wrote:

> Hi,Enzo wang
> 图片无法加载,github地址也无法访问,你可以试一下hive可以正常读写表么
>
>
>
>
>
>
>
>
>
>
>
>
>
> --
>
> Best,
> wldd
>
>
>
>
> 在 2020-05-26 17:01:32,"Enzo wang" <[email protected]> 写道:
>
> Hi Wldd,
>
>
> 谢谢回复。
>
>
> 1.  datanode 是可用的
>
>
> ❯ docker-compose exec namenode hadoop fs -ls /tmp
> Found 1 items
> drwx-wx-wx   - root supergroup          0 2020-05-26 05:40 /tmp/hive
>
>
> namenode 的webui 也可以看到:
>
>
>
>
> 2.  设置set execution.type=batch; 以后,执行报错,错误如下
> Causedby: org.apache.hadoop.ipc.RemoteException(java.io.IOException):
> File
> /user/hive/warehouse/pokes/.staging_1590483224500/cp-0/task-0/cp-0-task-0-file-0
> could only be replicated to 0 nodes instead of minReplication (=1). There
> are 1 datanode(s) running and1 node(s) are excluded inthis operation.
>
>
> 完整错误见:
> https://gist.github.com/r0c/f95ec650fec0a16055787ac0d63f4673
>
>
>
> On Tue, 26 May 2020 at 16:52, wldd <[email protected]> wrote:
>
> 问题1:
>
> org.apache.hadoop.hdfs.BlockMissingException,可以用hadoop fs
> 命令看看那个datanode能不能访问
>
>
> 问题2:
> 写hive,需要用batch模式,set execution.type=batch;
>
>
>
>
>
>
>
> 在 2020-05-26 16:42:12,"Enzo wang" <[email protected]> 写道:
>
> Hi Flink group,
>
>
> 今天再看Flink与Hive集成的部分遇到了几个问题,麻烦大家帮忙看看。
> 参考的网址:
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/hive/hive_catalog.html
>
>
> 版本、表结构信息见这里: https://gist.github.com/r0c/e244622d66447dfc85a512e75fc2159b
>
>
> 问题1:Flink SQL 读Hive 表pokes 失败
>
>
> Flink SQL> select * from pokes;
> 2020-05-26 16:12:11,439 INFO  org.apache.hadoop.mapred.FileInputFormat
>                   - Total input paths to process : 4
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.hadoop.hdfs.BlockMissingException: Could not obtain block:
> BP-138389591-172.20.0.4-1590471581703:blk_1073741825_1001
> file=/user/hive/warehouse/pokes/kv1.txt
>
>
>
>
>
>
>
> 问题2:Flink SQL 写Hive 表pokes 失败
>
>
> Flink SQL> insert into pokes select 12,'tom';
> [INFO] Submitting SQL update statement to the cluster...
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.TableException: Stream Tables can only be
> emitted by AppendStreamTableSink, RetractStreamTableSink, or
> UpsertStreamTableSink.
>
>
>
>
>
>
>
> Cheers,
> Enzo

回复