Hi Wldd,

Hive 写测试了,没问题。

0: jdbc:hive2://localhost:10000> select count(*) from pokes;
WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the
future versions. Consider using a different execution engine (i.e. spark,
tez) or using Hive 1.X releases.
+------+
| _c0  |
+------+
| 504  |
+------+
1 row selected (41.794 seconds)

0: jdbc:hive2://localhost:10000> INSERT INTO Pokes values( 200,'Kitty');
WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the
future versions. Consider using a different execution engine (i.e. spark,
tez) or using Hive 1.X releases.
No rows affected (2.523 seconds)

0: jdbc:hive2://localhost:10000> select count(*) from pokes;
WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the
future versions. Consider using a different execution engine (i.e. spark,
tez) or using Hive 1.X releases.
+------+
| _c0  |
+------+
| 505  |
+------+
1 row selected (1.7 seconds)

Cheers,
Enzo

On Tue, 26 May 2020 at 18:14, wldd <[email protected]> wrote:

> hive写数据测了么,按照你提供的异常信息,显示的是hdfs的问题
>
>
>
>
>
>
>
>
>
>
> --
>
> Best,
> wldd
>
>
>
>
>
> 在 2020-05-26 17:49:56,"Enzo wang" <[email protected]> 写道:
> >Hi Wldd,
> >
> >Hive 是可以访问的,我把之前的2个gist内容贴在这个邮件里,你看一下,里面有hive查询数据的返回结果。
> >
> >还需要什么信息我再提供。
> >
> >
> >
> >========  flink insert into hive error ========
> >
> >org.apache.flink.table.api.TableException: Exception in close
> >       at
> org.apache.flink.table.filesystem.FileSystemOutputFormat.close(FileSystemOutputFormat.java:131)
> >       at
> org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.close(OutputFormatSinkFunction.java:97)
> >       at
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
> >       at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:109)
> >       at
> org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:635)
> >       at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$afterInvoke$1(StreamTask.java:515)
> >       at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> >       at
> org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:513)
> >       at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:478)
> >       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> >       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> >       at java.base/java.lang.Thread.run(Thread.java:830)
> >Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException):
> >File
> /user/hive/warehouse/pokes/.staging_1590483224500/cp-0/task-0/cp-0-task-0-file-0
> >could only be replicated to 0 nodes instead of minReplication (=1).
> >There are 1 datanode(s) running and 1 node(s) are excluded in this
> >operation.
> >       at
> org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1628)
> >       at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getNewBlockTargets(FSNamesystem.java:3121)
> >       at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3045)
> >       at
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:725)
> >       at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:493)
> >       at
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
> >       at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
> >       at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
> >       at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2217)
> >       at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2213)
> >       at java.security.AccessController.doPrivileged(Native Method)
> >       at javax.security.auth.Subject.doAs(Subject.java:422)
> >       at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1746)
> >       at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2213)
> >
> >       at org.apache.hadoop.ipc.Client.call(Client.java:1476)
> >       at org.apache.hadoop.ipc.Client.call(Client.java:1413)
> >       at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
> >       at com.sun.proxy.$Proxy22.addBlock(Unknown Source)
> >       at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:418)
> >       at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> >Method)
> >       at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >       at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >       at java.base/java.lang.reflect.Method.invoke(Method.java:567)
> >       at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
> >       at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> >       at com.sun.proxy.$Proxy23.addBlock(Unknown Source)
> >       at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1588)
> >       at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1373)
> >       at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:554)
> >
> >======== Flink 1.10.0 的lib目录 ========
> >
> >mysql-connector-java-5.1.48.jar
> >slf4j-log4j12-1.7.15.jar
> >log4j-1.2.17.jar
> >flink-table_2.11-1.10.0.jar
> >flink-table-blink_2.11-1.10.0.jar
> >flink-dist_2.11-1.10.0.jar
> >flink-jdbc_2.11-1.10.0.jar
> >flink-sql-connector-elasticsearch6_2.11-1.10.0.jar
> >flink-sql-connector-kafka_2.11-1.10.0.jar
> >flink-json-1.10.0.jar
> >flink-connector-hive_2.11-1.10.0.jar
> >flink-shaded-hadoop-2-uber-2.7.5-10.0.jar
> >hive-exec-2.3.7.jar
> >flink-csv-1.10.1.jar
> >
> >
> >======== Hive table "pokes" ========
> >
> >❯ docker-compose exec hive-server bash
> >root@53082ed70ecd:/opt# /opt/hive/bin/beeline -u
> jdbc:hive2://localhost:10000
> >SLF4J: Class path contains multiple SLF4J bindings.
> >SLF4J: Found binding in
>
> >[jar:file:/opt/hive/lib/log4j-slf4j-impl-2.6.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> >SLF4J: Found binding in
>
> >[jar:file:/opt/hadoop-2.7.4/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> >SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
> >SLF4J: Actual binding is of type
> [org.apache.logging.slf4j.Log4jLoggerFactory]
> >Connecting to jdbc:hive2://localhost:10000
> >Connected to: Apache Hive (version 2.3.2)
> >Driver: Hive JDBC (version 2.3.2)
> >Transaction isolation: TRANSACTION_REPEATABLE_READ
> >Beeline version 2.3.2 by Apache Hive
> >0: jdbc:hive2://localhost:10000> describe formatted pokes;
>
> >+-------------------------------+----------------------------------------------------+-----------------------+
> >|           col_name            |                     data_type
> >              |        comment        |
>
> >+-------------------------------+----------------------------------------------------+-----------------------+
> >| # col_name                    | data_type
> >              | comment               |
> >|                               | NULL
> >              | NULL                  |
> >| foo                           | int
> >              |                       |
> >| bar                           | string
> >              |                       |
> >|                               | NULL
> >              | NULL                  |
> >| # Detailed Table Information  | NULL
> >              | NULL                  |
> >| Database:                     | default
> >              | NULL                  |
> >| Owner:                        | root
> >              | NULL                  |
> >| CreateTime:                   | Tue May 26 05:42:30 UTC 2020
> >              | NULL                  |
> >| LastAccessTime:               | UNKNOWN
> >              | NULL                  |
> >| Retention:                    | 0
> >              | NULL                  |
> >| Location:                     |
> >hdfs://namenode:8020/user/hive/warehouse/pokes     | NULL
> >    |
> >| Table Type:                   | MANAGED_TABLE
> >              | NULL                  |
> >| Table Parameters:             | NULL
> >              | NULL                  |
> >|                               | numFiles
> >              | 4                     |
> >|                               | numRows
> >              | 0                     |
> >|                               | rawDataSize
> >              | 0                     |
> >|                               | totalSize
> >              | 5839                  |
> >|                               | transient_lastDdlTime
> >              | 1590480090            |
> >|                               | NULL
> >              | NULL                  |
> >| # Storage Information         | NULL
> >              | NULL                  |
> >| SerDe Library:                |
> >org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe | NULL
> >    |
> >| InputFormat:                  |
> >org.apache.hadoop.mapred.TextInputFormat           | NULL
> >    |
> >| OutputFormat:                 |
> >org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat | NULL
> >            |
> >| Compressed:                   | No
> >              | NULL                  |
> >| Num Buckets:                  | -1
> >              | NULL                  |
> >| Bucket Columns:               | []
> >              | NULL                  |
> >| Sort Columns:                 | []
> >              | NULL                  |
> >| Storage Desc Params:          | NULL
> >              | NULL                  |
> >|                               | serialization.format
> >              | 1                     |
>
> >+-------------------------------+----------------------------------------------------+-----------------------+
> >30 rows selected (0.328 seconds)
> >0: jdbc:hive2://localhost:10000>
> >
> >0: jdbc:hive2://localhost:10000> select * from pokes limit 10;
> >+------------+------------+
> >| pokes.foo  | pokes.bar  |
> >+------------+------------+
> >| 25         | Tommy      |
> >| 26         | Tommy      |
> >| 27         | Tommy      |
> >| 238        | val_238    |
> >| 86         | val_86     |
> >| 311        | val_311    |
> >| 27         | val_27     |
> >| 165        | val_165    |
> >| 409        | val_409    |
> >| 255        | val_255    |
> >+------------+------------+
> >10 rows selected (0.622 seconds)
> >0: jdbc:hive2://localhost:10000>
> >
> >
> >======== Hive table "pokes" in Flink ========
> >
> >Flink SQL> describe pokes;
> >root
> > |-- foo: INT
> > |-- bar: STRING
> >
> >
> >======== hadoop/hive 环境 ========
> >
> >version: "3"
> >
> >services:
> >  namenode:
> >    image: bde2020/hadoop-namenode:2.0.0-hadoop2.7.4-java8
> >    volumes:
> >      - namenode:/hadoop/dfs/name
> >    environment:
> >      - CLUSTER_NAME=test
> >    env_file:
> >      - ./hadoop-hive.env
> >    ports:
> >      - "50070:50070"
> >      - "8020:8020"
> >  datanode:
> >    image: bde2020/hadoop-datanode:2.0.0-hadoop2.7.4-java8
> >    volumes:
> >      - datanode:/hadoop/dfs/data
> >    env_file:
> >      - ./hadoop-hive.env
> >    environment:
> >      SERVICE_PRECONDITION: "namenode:50070"
> >    ports:
> >      - "50075:50075"
> >  hive-server:
> >    image: bde2020/hive:2.3.2-postgresql-metastore
> >    env_file:
> >      - ./hadoop-hive.env
> >    environment:
> >      HIVE_CORE_CONF_javax_jdo_option_ConnectionURL:
> >"jdbc:postgresql://hive-metastore/metastore"
> >      SERVICE_PRECONDITION: "hive-metastore:9083"
> >    ports:
> >      - "10000:10000"
> >  hive-metastore:
> >    image: bde2020/hive:2.3.2-postgresql-metastore
> >    env_file:
> >      - ./hadoop-hive.env
> >    command: /opt/hive/bin/hive --service metastore
> >    environment:
> >      SERVICE_PRECONDITION: "namenode:50070 datanode:50075
> >hive-metastore-postgresql:5432"
> >    ports:
> >      - "9083:9083"
> >  hive-metastore-postgresql:
> >    image: bde2020/hive-metastore-postgresql:2.3.0
> >    ports:
> >      - "5432:5432"
> >  presto-coordinator:
> >    image: shawnzhu/prestodb:0.181
> >    ports:
> >      - "8080:8080"
> >
> >volumes:
> >  namenode:
> >  datanode:
> >
> >
> >======== hive-site.xml ========
> >
> ><configuration>
> >    <property>
> >        <name>hive.metastore.uris</name>
> >        <value>thrift://localhost:9083</value>
> >    </property>
> >    <property>
> >        <name>javax.jdo.option.ConnectionURL</name>
> >
> <value>jdbc:postgresql://localhost/metastore?createDatabaseIfNotExist=true</value>
> >    </property>
> >    <property>
> >        <name>javax.jdo.option.ConnectionDriverName</name>
> >        <value>org.postgresql.Driver</value>
> >    </property>
> >    <property>
> >        <name>javax.jdo.option.ConnectionPassword</name>
> >        <value>hive</value>
> >    </property>
> >    <property>
> >        <name>javax.jdo.option.ConnectionUserName</name>
> >        <value>hive</value>
> >    </property>
> >   <property>
> >       <name>hive.metastore.schema.verification</name>
> >       <value>true</value>
> >   </property>
> ></configuration>
> >
> >
> >======== sql-client-defaults.yaml ========
> >
>
> >################################################################################
> >#  Licensed to the Apache Software Foundation (ASF) under one
> >#  or more contributor license agreements.  See the NOTICE file
> >#  distributed with this work for additional information
> >#  regarding copyright ownership.  The ASF licenses this file
> >#  to you under the Apache License, Version 2.0 (the
> >#  "License"); you may not use this file except in compliance
> >#  with the License.  You may obtain a copy of the License at
> >#
> >#      http://www.apache.org/licenses/LICENSE-2.0
> >#
> >#  Unless required by applicable law or agreed to in writing, software
> >#  distributed under the License is distributed on an "AS IS" BASIS,
> >#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> >#  See the License for the specific language governing permissions and
> ># limitations under the License.
>
> >################################################################################
> >
> >
> ># This file defines the default environment for Flink's SQL Client.
> ># Defaults might be overwritten by a session specific environment.
> >
> >
> ># See the Table API & SQL documentation for details about supported
> properties.
> >
> >
>
> >#==============================================================================
> ># Tables
>
> >#==============================================================================
> >
> ># Define tables here such as sources, sinks, views, or temporal tables.
> >
> >tables: [] # empty list
> ># A typical table source definition looks like:
> ># - name: ...
> >#   type: source-table
> >#   connector: ...
> >#   format: ...
> >#   schema: ...
> >
> ># A typical view definition looks like:
> ># - name: ...
> >#   type: view
> >#   query: "SELECT ..."
> >
> ># A typical temporal table definition looks like:
> ># - name: ...
> >#   type: temporal-table
> >#   history-table: ...
> >#   time-attribute: ...
> >#   primary-key: ...
> >
> >
>
> >#==============================================================================
> ># User-defined functions
>
> >#==============================================================================
> >
> ># Define scalar, aggregate, or table functions here.
> >
> >functions: [] # empty list
> ># A typical function definition looks like:
> ># - name: ...
> >#   from: class
> >#   class: ...
> >#   constructor: ...
> >
> >
>
> >#==============================================================================
> ># Catalogs
>
> >#==============================================================================
> >
> ># Define catalogs here.
> >
> >catalogs:
> >  - name: myhive
> >    type: hive
> >    hive-conf-dir: /Users/enzow/code/flink-sql-demo/flink-1.10.0/conf
> >    hive-version: 2.3.2
> >
> >
>
> >#==============================================================================
> ># Modules
>
> >#==============================================================================
> >
> ># Define modules here.
> >
> >#modules: # note the following modules will be of the order they are
> specified
> >#  - name: core
> >#    type: core
> >
>
> >#==============================================================================
> ># Execution properties
>
> >#==============================================================================
> >
> ># Properties that change the fundamental execution behavior of a table
> program.
> >
> >execution:
> >  # select the implementation responsible for planning table programs
> >  # possible values are 'blink' (used by default) or 'old'
> >  planner: blink
> >  # 'batch' or 'streaming' execution
> >  type: streaming
> >  # allow 'event-time' or only 'processing-time' in sources
> >  time-characteristic: event-time
> >  # interval in ms for emitting periodic watermarks
> >  periodic-watermarks-interval: 200
> >  # 'changelog' or 'table' presentation of results
> >  result-mode: table
> >  # maximum number of maintained rows in 'table' presentation of results
> >  max-table-result-rows: 1000000
> >  # parallelism of the program
> >  parallelism: 1
> >  # maximum parallelism
> >  max-parallelism: 128
> >  # minimum idle state retention in ms
> >  min-idle-state-retention: 0
> >  # maximum idle state retention in ms
> >  max-idle-state-retention: 0
> >  # current catalog ('default_catalog' by default)
> >  current-catalog: default_catalog
> >  # current database of the current catalog (default database of the
> >catalog by default)
> >  current-database: default_database
> >  # controls how table programs are restarted in case of a failures
> >  restart-strategy:
> >    # strategy type
> >    # possible values are "fixed-delay", "failure-rate", "none", or
> >"fallback" (default)
> >    type: fallback
> >
>
> >#==============================================================================
> ># Configuration options
>
> >#==============================================================================
> >
> ># Configuration options for adjusting and tuning table programs.
> >
> ># A full list of options and their default values can be found
> ># on the dedicated "Configuration" web page.
> >
> ># A configuration can look like:
> ># configuration:
> >#   table.exec.spill-compression.enabled: true
> >#   table.exec.spill-compression.block-size: 128kb
> >#   table.optimizer.join-reorder-enabled: true
> >
>
> >#==============================================================================
> ># Deployment properties
>
> >#==============================================================================
> >
> ># Properties that describe the cluster to which table programs are
> submitted to.
> >
> >deployment:
> >  # general cluster communication timeout in ms
> >  response-timeout: 5000
> >  # (optional) address from cluster to gateway
> >  gateway-address: ""
> >  # (optional) port from cluster to gateway
> >  gateway-port: 0
> >
> >
> >
> >
> >
> >Cheers,
> >Enzo
> >
> >
> >On Tue, 26 May 2020 at 17:15, wldd <[email protected]> wrote:
> >
> >> Hi,Enzo wang
> >> 图片无法加载,github地址也无法访问,你可以试一下hive可以正常读写表么
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> --
> >>
> >> Best,
> >> wldd
> >>
> >>
> >>
> >>
> >> 在 2020-05-26 17:01:32,"Enzo wang" <[email protected]> 写道:
> >>
> >> Hi Wldd,
> >>
> >>
> >> 谢谢回复。
> >>
> >>
> >> 1.  datanode 是可用的
> >>
> >>
> >> ❯ docker-compose exec namenode hadoop fs -ls /tmp
> >> Found 1 items
> >> drwx-wx-wx   - root supergroup          0 2020-05-26 05:40 /tmp/hive
> >>
> >>
> >> namenode 的webui 也可以看到:
> >>
> >>
> >>
> >>
> >> 2.  设置set execution.type=batch; 以后,执行报错,错误如下
> >> Causedby: org.apache.hadoop.ipc.RemoteException(java.io.IOException):
> >> File
> >>
> /user/hive/warehouse/pokes/.staging_1590483224500/cp-0/task-0/cp-0-task-0-file-0
> >> could only be replicated to 0 nodes instead of minReplication (=1).
> There
> >> are 1 datanode(s) running and1 node(s) are excluded inthis operation.
> >>
> >>
> >> 完整错误见:
> >> https://gist.github.com/r0c/f95ec650fec0a16055787ac0d63f4673
> >>
> >>
> >>
> >> On Tue, 26 May 2020 at 16:52, wldd <[email protected]> wrote:
> >>
> >> 问题1:
> >>
> >> org.apache.hadoop.hdfs.BlockMissingException,可以用hadoop fs
> >> 命令看看那个datanode能不能访问
> >>
> >>
> >> 问题2:
> >> 写hive,需要用batch模式,set execution.type=batch;
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-05-26 16:42:12,"Enzo wang" <[email protected]> 写道:
> >>
> >> Hi Flink group,
> >>
> >>
> >> 今天再看Flink与Hive集成的部分遇到了几个问题,麻烦大家帮忙看看。
> >> 参考的网址:
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/hive/hive_catalog.html
> >>
> >>
> >> 版本、表结构信息见这里:
> https://gist.github.com/r0c/e244622d66447dfc85a512e75fc2159b
> >>
> >>
> >> 问题1:Flink SQL 读Hive 表pokes 失败
> >>
> >>
> >> Flink SQL> select * from pokes;
> >> 2020-05-26 16:12:11,439 INFO  org.apache.hadoop.mapred.FileInputFormat
> >>                   - Total input paths to process : 4
> >> [ERROR] Could not execute SQL statement. Reason:
> >> org.apache.hadoop.hdfs.BlockMissingException: Could not obtain block:
> >> BP-138389591-172.20.0.4-1590471581703:blk_1073741825_1001
> >> file=/user/hive/warehouse/pokes/kv1.txt
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 问题2:Flink SQL 写Hive 表pokes 失败
> >>
> >>
> >> Flink SQL> insert into pokes select 12,'tom';
> >> [INFO] Submitting SQL update statement to the cluster...
> >> [ERROR] Could not execute SQL statement. Reason:
> >> org.apache.flink.table.api.TableException: Stream Tables can only be
> >> emitted by AppendStreamTableSink, RetractStreamTableSink, or
> >> UpsertStreamTableSink.
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> Cheers,
> >> Enzo
>

回复