flink sql 任务中jm Blob server 总是在凌晨报 java.io.exception :unknow opreation 71

2022-04-07 文章 su wenwen
hi,all.想问大家下,是否有遇到过这个问题,flink 1.12 的版本
在线上运行的flink sql 作业,总是在凌晨报错如下:
[cid:b11b980a-9bcd-4e7d-993a-e83a9322c66c]

blobserver 我理解是传输二进制jar 包,从hdfs 到 本地工作目录。但没发现其他环节出现问题,对任务数据未产生影响。。


flink1.13、1.14 sql connector 写入hive orc 文件报错问题

2022-01-05 文章 su wenwen
hello,all
想咨询下各位是否遇到这个问题:

环境:
hadoop 2.10.1
hive 2.3.6
flink 1.12-1.14(这些版本都测试了,都有这个问题)

依赖部署方式:
在环境变量中添加了`hadoop class_path`
使用官方提供的flink-sql-connector-hive-2.3.6 包放置flink/lib 下

使用sql-client 进行任务提交sql 如下:

create table source_kafka(
order_id int,
order_date varchar,
customer_name varchar,
price decimal(10,3),
product_id int,
order_status boolean
)with(
  'connector' = 'kafka',
  'topic' = 'topn_char1',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'json'
);

create table datagen_source
with(  'connector' = 'datagen') like source_kafka(EXCLUDING OPTIONS);

create catalog myhive_local with ('type' = 'hive', 'hive-conf-dir' = 
'/Users/feng/hive-2.3.6/conf');
SET 'table.sql-dialect'='hive';
create table orders_local(
order_id int,
order_date string,
customer_name string,
price decimal(10,3),
product_id int,
order_status boolean
)partitioned by (dt string)
stored as orc;
SET 'table.sql-dialect'='default';
insert into myhive_local.cataloghive.orders_local
/*+ OPTIONS(
 'sink.partition-commit.trigger'='process-time',
 'sink.partition-commit.policy.kind'='metastore,success-file',
 'sink.rolling-policy.file-size'='128MB',
 'sink.rolling-policy.rollover-interval'='1min',
 'sink.rolling-policy.check-interval'='1min',
 'auto-compaction'='true',
 'compaction.file-size'='128MB'

) */
select * , '2099-09-00' as dt from datagen_source;

这个任务直接提交至本地集群运行是没问题的,但是写表的方式是用mapred 的方式,比较慢,无法满足性能要求,根据官方文档
加了
set table.exec.hive.fallback-mapred-writer = 
false
这个参数后,再提交都会报缺少orc 包中的方法,不知是冲突还是依赖配置的问题
但是parquet 的表格式就可以支持,orc 的表格式就无法支持,试了flink 1.12以后的版本都是相同的报错

at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
Caused by: java.lang.NoSuchMethodError: 
org.apache.orc.OrcFile$WriterOptions.getHadoopShims()Lorg/apache/orc/impl/HadoopShims;
at 
org.apache.flink.orc.writer.PhysicalWriterImpl.(PhysicalWriterImpl.java:103)
at 
org.apache.flink.orc.writer.OrcBulkWriterFactory.create(OrcBulkWriterFactory.java:99)
at 
org.apache.flink.table.filesystem.FileSystemTableSink$ProjectionBulkFactory.create(FileSystemTableSink.java:630)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter.openNew(BulkBucketWriter.java:75)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedBucketWriter.openNewInProgressFile(OutputStreamBasedPartFileWriter.java:90)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter.openNewInProgressFile(BulkBucketWriter.java:36)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:243)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:220)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:305)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:103)
at 
org.apache.flink.table.filesystem.stream.AbstractStreamingWriter.processElement(AbstractStreamingWriter.java:140)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
at StreamExecCalc$21.processElement(Unknown Source)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:418)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:513)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.collect(StreamSourceContexts.java:103)
at 
org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource.run(DataGeneratorSource.ja

回复: Re:回复: flink cdc支持mysql整库同步进hudi湖吗?

2021-12-07 文章 su wenwen
hi,  I think Chengyanan is talking about hudig website
https://hudi.apache.org/docs/flink-quick-start-guide
Flink Guide | Apache Hudi!
Notice that the save mode is now Append.In general, always use append mode 
unless you are trying to create the table for the first time. Querying the data 
again will now show updated records. Each write operation generates a new 
commit denoted by the timestamp. Look for changes in _hoodie_commit_time, age 
fields for the same _hoodie_record_keys in previous commit. {: .notice--info}
hudi.apache.org


发件人: casel.chen 
发送时间: 2021年12月8日 0:19
收件人: user-zh@flink.apache.org 
主题: Re:回复: flink cdc支持mysql整库同步进hudi湖吗?

“例子参考hudi官网 ” -> 这个没有链接吗?官网哪里有介绍cdc整库入湖了?

















在 2021-12-07 10:23:03,"chengyanan1...@foxmail.com"  
写道:
>支持,例子参考hudi官网
>
>
>
>chengyanan1...@foxmail.com
>
>发件人: casel.chen
>发送时间: 2021-12-06 23:55
>收件人: user-zh@flink.apache.org
>主题: flink cdc支持mysql整库同步进hudi湖吗?
>flink cdc支持mysql整库同步进hudi湖吗?如果支持的话,希望能给一个例子,还要求能够支持schema变更。谢谢!


回复: apache-flink - 在pyflink1.14中聚合函数无法输出的问题

2021-12-05 文章 su wenwen
hi,zebing!

You can go to localhost:8081 and see if it works. Also, data written to Kafka 
should be in double quotes.
example:
{"amount": 500, "course_code": "97iscn4g8k","event_time":"2021-12-01 17:54:41"}

Window aggregation needs to pay attention to the progress of watermark.

发件人: duanzebing 
发送时间: 2021年12月4日 18:14
收件人: user-zh@flink.apache.org 
主题: apache-flink - 在pyflink1.14中聚合函数无法输出的问题

大家好:
 
我是一个pyflink初学者,遇到一个flinksql中聚合后无法sink的问题,但是我的代码完全按照官方文档进行,到最后依然无法解决,只能像各位求助。
我的 source语句为:
CREATE TABLE random_source (
amount int,
course_code string,
`event_time` TIMESTAMP(3) ,
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'ding_broadcast_vip_order_test',
  'properties.bootstrap.servers' = '192.168.100.135:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'json'
)
sink为:
  CREATE TABLE print_sink (
amount BIGINT,
course_code string,
   `event_time_start` TIMESTAMP(3),
   `event_time_end` TIMESTAMP(3)
) WITH (
  'connector' = 'print'
)
而我尝试过如下两种方式聚合,可是都遇到同样的问题:
insert into print_sink select
count(amount) as amount
,course_code
,tumble_start(event_time, interval '1' minute) as 
event_time_start
,tumble_end(event_time, interval '1' minute) as 
event_time_end
from random_source
group by tumble(event_time, interval '1' minute),course_code
以及:
insert into print_sink select
sum(amount),course_code,window_start,window_end
from table(
tumble(
table random_source,
descriptor(event_time),
interval '1' minutes
)
) group by window_start, window_end,,course_code
我的kafka数据结构为
{'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01 17:44:40’}
{'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01 17:45:40’}
{'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01 17:46:40’}
{'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01 17:47:40’}
{'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01 17:48:40’}
{'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01 17:49:40’}
{'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01 17:50:40’}
{'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01 17:51:40’}
   
在我的预期中他应该是按照每分钟作为窗口,然后按照course_code进行聚合,可结果是sink没有任何输出,仿佛没有接受到任何数据。但是在为仅仅是普通的select
 * from random_source时,sink又能够正常输出,
我实在无能为力,请各位不吝赐教:下面附上我的出问题的代码。
非常感谢各位

from pyflink.table import EnvironmentSettings, StreamTableEnvironment, 
TableEnvironment


def hello_world():
"""
从随机Source读取数据,然后直接利用PrintSink输出。
"""
settings = EnvironmentSettings.in_streaming_mode()
# env = StreamExecutionEnvironment.get_execution_environment()
# t_env = StreamTableEnvironment.create(s_env)
t_env = TableEnvironment.create(settings)
t_env.get_config().get_configuration().set_string("pipeline.jars",
  
"file:///Users/duanzebing/Downloads/flink-sql-connector-kafka_2.11-1.14.0.jar")

source_ddl = """
CREATE TABLE random_source (
amount int,
course_code string,
`event_time` TIMESTAMP(3) ,
WATERMARK FOR event_time AS event_time - INTERVAL 
'5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'ding_broadcast_vip_order_test',
  'properties.bootstrap.servers' = 
'192.168.100.135:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'json'
)
"""
# f_sequence_ as PROCTIME()  # METADATA FROM 
'timestamp'
# 注册source
t_env.execute_sql(source_ddl)
# 数据提取

"""
sink_ddl = """
  CREATE TABLE print_sink (
amount BIGINT,
course_code string,
   `event_time_start` TIMESTAMP(3),
   `event_time_end` TIMESTAMP(3)
) WITH (
  'connector' = 'print'
)
"""
# 注册sink
t_env.execute_sql(sink_ddl)

t_env.execute_sql('''insert into print_sink select
count(amount) as amount
,course_code
,tumble_start(event_time, interval '1' 

回复: ERROR org.apache.flink.shaded.curator4.org.apache.curator.ConnectionState - Authentication failed

2021-12-03 文章 su wenwen
看报错和log4j 文件格式有关,log4j.properties的文件改为log4j2.xml 试一下


发件人: summer 
发送时间: 2021年12月2日 11:32
收件人: user-zh@flink.apache.org 
主题: ERROR org.apache.flink.shaded.curator4.org.apache.curator.ConnectionState - 
Authentication failed

在我CDH6.3.2集成Flink1.13.3的时候,在执行flink-sql的时候,在日志中会出现这个报错:


ERROR StatusLogger No Log4j 2 configuration file found. Using default
configuration (logging only errors to the console), or user
programmatically provided configurations. Set system property
'log4j2.debug' to show Log4j 2 internal initialization logging. See
https://logging.apache.org/log4j/2.x/manual/configuration.html for
instructions on how to configure Log4j 2
10:45:50.236 [main-EventThread] ERROR
org.apache.flink.shaded.curator4.org.apache.curator.ConnectionState -
Authentication failed
JobManager Web Interface: http://lo-t-work3:8081
The Flink Yarn cluster has failed.
10:56:08.877 [main-EventThread] ERROR
org.apache.flink.shaded.curator4.org.apache.curator.ConnectionState -
Authentication failed


请问这是什么原因造成的?


回复: 退订

2021-12-03 文章 su wenwen
退订发送到 user-zh-unsubscr...@flink.apache.org


发件人: ™薇维苿尉℃ 
发送时间: 2021年12月3日 17:34
收件人: user-zh 
主题: 退订

退订


flink web ui job overview 无法显示算子接收到的数据条数

2021-11-29 文章 su wenwen



Hello,all
为啥在 pipeline.operator-chaining 设置为true 的时候 flink web ui job overview 
无法显示算子接收,发送到的数据条数和字节大小?
图片在附件

[https://mail-online.nosdn.127.net/qiyelogo/defaultAvatar.png]
zhiyuan.franklin
zhiyuan.frank...@outlook.com
签名由 网易邮箱大师 定制