Re: flink json ddl解析

2020-09-01 文章 zilong xiao
基本类型包装一层会导致解析不出来  这个没太明白,可以举个列子吗?

Dream-底限  于2020年9月1日周二 下午2:20写道:

> hi、
> 我先前也想这样用,但后来发现ddl中的row对应json中的object,基本类型包装一层会导致解析不出来,感觉应该在ddl加一个类型映射一下这种情况
>
> zilong xiao  于2020年9月1日周二 上午11:47写道:
>
> > like this:  ARRAY String>>>
> >
> > Dream-底限  于2020年9月1日周二 上午11:40写道:
> >
> > > hi
> > >
> > >
> >
> 我正在解析json数组,在解析的时候遇到一个问题,当解析的json数组元素为同一类型的时候,我可以使用ddl的array进行存储,但是当json数组元素为不同类型的时候,我没办法做ddl映射,我查看JsonRowSchemaConverter解析json
> > > array的时候,对于不同类型的数组元素解析后可以用row存储,但请问我在ddl时候要怎么做,因为在DDL用row表示数组会抛出异常
> > >
> > >
> > > private static TypeInformation convertArray(String location,
> > > JsonNode node, JsonNode root) {
> > >// validate items
> > >if (!node.has(ITEMS)) {
> > >   throw new IllegalArgumentException(
> > >  "Arrays must specify an '" + ITEMS + "' property in node: " +
> > > location);
> > >}
> > >final JsonNode items = node.get(ITEMS);
> > >
> > >// list (translated to object array)
> > >if (items.isObject()) {
> > >   final TypeInformation elementType = convertType(
> > >  location + '/' + ITEMS,
> > >  items,
> > >  root);
> > >   // result type might either be ObjectArrayTypeInfo or
> > > BasicArrayTypeInfo for Strings
> > >   return Types.OBJECT_ARRAY(elementType);
> > >}
> > >// tuple (translated to row)
> > >else if (items.isArray()) {
> > >   final TypeInformation[] types = convertTypes(location + '/' +
> > > ITEMS, items, root);
> > >
> > >   // validate that array does not contain additional items
> > >   if (node.has(ADDITIONAL_ITEMS) &&
> > > node.get(ADDITIONAL_ITEMS).isBoolean() &&
> > > node.get(ADDITIONAL_ITEMS).asBoolean()) {
> > >  throw new IllegalArgumentException(
> > > "An array tuple must not allow additional items in node: "
> > > + location);
> > >   }
> > >
> > >   return Types.ROW(types);
> > >}
> > >throw new IllegalArgumentException(
> > >   "Invalid type for '" + ITEMS + "' property in node: " +
> location);
> > > }
> > >
> >
>


Re: 关于flink任务的日志收集到kafka,可以在logback配置文件中,加如每个job的id或者name吗?

2020-08-30 文章 zilong xiao
想问下你用的flink哪个版本呢?
如果是Flink 1.10-版本,可以在shell脚本中加上 -yD
jobName=xxx,然后在logback自定义PatternLayout中用环境变量`_DYNAMIC_PROPERTIES`获取
如果是Flink 1.10+版本,则上述方式不可行,因为1.10+版本在作业启动执行 launch_container.sh
<http://dn-rt199.jja.bigo:8042/node/containerlogs/container_e19_1597907464753_1954_01_01/zengkejie/launch_container.sh/?start=-4096>脚本时,脚本中不再`export
 _DYNAMIC_PROPERTIES`变量,所以无法从环境变量获取,那么可以在flink-conf.yaml中添加
`env.java.opts: -Djob.name=xxx`,然后在 PatternLayout中获取启动参数即可

以上是我个人的实现方式,目前可正常运行,如有描述不正确的地方,欢迎探讨~

Jim Chen  于2020年8月31日周一 上午11:33写道:

> 我现在是用shell脚本提交per job模式的任务,现在只能拿到yarn的applicationId,自定义的任务名,拿不到
>
>
> zilong xiao  于2020年8月27日周四 下午7:24写道:
>
> > 如果是用CLI方式提交作业的话是可以做到的
> >
> > Jim Chen  于2020年8月27日周四 下午6:13写道:
> >
> > > 如果是自动以PatternLayout的话,我有几点疑问:
> > >
> > >
> >
> 1、logback加载时机的问题,就是①先会运行logback相关类,②再执行你自定义的PatternLayout,③再去执行你的主类,在②的时候,此时还没法确定具体的启动类是啥,这种方式没法根据job动态变化
> > >
> > > 如果使用env的话
> > > 1、配置环境变量的话,如果yarn有10个节点。那么每台是不是都要配置一下
> > > 2、因为是每个job都要传递,所以,这个应该是临时的环境变量吧
> > > 3、如果是配置的临时环境变量的话,那么在执行bin/flink run的时候,shell中是执行java
> > >
> -cp的,此时的主类,是org.apache.flink.client.cli.CliFrontend,这种方式,环境变量在传递的时候,会丢吧?
> > >
> > > zilong xiao  于2020年8月25日周二 下午5:32写道:
> > >
> > > >
> > 1:想加入跟业务相关的字段,例如jobId,jobName,可以继承PatternLayout,重写doLayout即可,在方法中对日志进行填充
> > > > 2:这些属性有办法可以从环境变量中获取
> > > >
> > > > Jim Chen  于2020年8月25日周二 下午4:49写道:
> > > >
> > > > > 大家好:
> > > > > 我们在做flink的日志收集到kafak时,使用的logback日志配置文件,目前的pattern是%d{-MM-dd
> > > > > HH:mm:ss.SSS} [%thread] %-5level %logger{60} -
> > > > >
> %msg,有没有什么办法在里面加入每个job的id,name或者tasknamanger的主机名之类的信息啊。在做ELK的时候,方便查询。
> > > > > 这个配置文件,是整个项目的,是基于Yarn的per
> job模式,难道每个主类打包的时候,都要改动不同的logbakc配置文件吗?
> > > > >
> > > >
> > >
> >
>


Re: 关于flink任务的日志收集到kafka,可以在logback配置文件中,加如每个job的id或者name吗?

2020-08-31 文章 zilong xiao
可以用程序来完成的,flink-conf.yaml里可以先用占位符,例如 `env.java.opts:
-Djob.name={{job_name}}`  在你提交作业之前,先读到这个模板文件,在代码里去replace该占位符就好,不需要手动去改

Jim Chen  于2020年8月31日周一 下午1:33写道:

> 我也是flink1.10.1的版本的,如果按照你的方法,每次启动一个任务,都要在flink-conf.yaml中修改一下`env.java.opts:
> -Djob.name=xxx`吗?这样的话,是不是太麻烦了
>
> zilong xiao  于2020年8月31日周一 下午12:08写道:
>
> > 想问下你用的flink哪个版本呢?
> > 如果是Flink 1.10-版本,可以在shell脚本中加上 -yD
> > jobName=xxx,然后在logback自定义PatternLayout中用环境变量`_DYNAMIC_PROPERTIES`获取
> > 如果是Flink 1.10+版本,则上述方式不可行,因为1.10+版本在作业启动执行 launch_container.sh
> > <
> >
> http://dn-rt199.jja.bigo:8042/node/containerlogs/container_e19_1597907464753_1954_01_01/zengkejie/launch_container.sh/?start=-4096
> > >脚本时,脚本中不再`export
> >  _DYNAMIC_PROPERTIES`变量,所以无法从环境变量获取,那么可以在flink-conf.yaml中添加
> > `env.java.opts: -Djob.name=xxx`,然后在 PatternLayout中获取启动参数即可
> >
> > 以上是我个人的实现方式,目前可正常运行,如有描述不正确的地方,欢迎探讨~
> >
> > Jim Chen  于2020年8月31日周一 上午11:33写道:
> >
> > > 我现在是用shell脚本提交per job模式的任务,现在只能拿到yarn的applicationId,自定义的任务名,拿不到
> > >
> > >
> > > zilong xiao  于2020年8月27日周四 下午7:24写道:
> > >
> > > > 如果是用CLI方式提交作业的话是可以做到的
> > > >
> > > > Jim Chen  于2020年8月27日周四 下午6:13写道:
> > > >
> > > > > 如果是自动以PatternLayout的话,我有几点疑问:
> > > > >
> > > > >
> > > >
> > >
> >
> 1、logback加载时机的问题,就是①先会运行logback相关类,②再执行你自定义的PatternLayout,③再去执行你的主类,在②的时候,此时还没法确定具体的启动类是啥,这种方式没法根据job动态变化
> > > > >
> > > > > 如果使用env的话
> > > > > 1、配置环境变量的话,如果yarn有10个节点。那么每台是不是都要配置一下
> > > > > 2、因为是每个job都要传递,所以,这个应该是临时的环境变量吧
> > > > > 3、如果是配置的临时环境变量的话,那么在执行bin/flink run的时候,shell中是执行java
> > > > >
> > >
> -cp的,此时的主类,是org.apache.flink.client.cli.CliFrontend,这种方式,环境变量在传递的时候,会丢吧?
> > > > >
> > > > > zilong xiao  于2020年8月25日周二 下午5:32写道:
> > > > >
> > > > > >
> > > >
> > 1:想加入跟业务相关的字段,例如jobId,jobName,可以继承PatternLayout,重写doLayout即可,在方法中对日志进行填充
> > > > > > 2:这些属性有办法可以从环境变量中获取
> > > > > >
> > > > > > Jim Chen  于2020年8月25日周二 下午4:49写道:
> > > > > >
> > > > > > > 大家好:
> > > > > > >
> >  我们在做flink的日志收集到kafak时,使用的logback日志配置文件,目前的pattern是%d{-MM-dd
> > > > > > > HH:mm:ss.SSS} [%thread] %-5level %logger{60} -
> > > > > > >
> > > %msg,有没有什么办法在里面加入每个job的id,name或者tasknamanger的主机名之类的信息啊。在做ELK的时候,方便查询。
> > > > > > > 这个配置文件,是整个项目的,是基于Yarn的per
> > > job模式,难道每个主类打包的时候,都要改动不同的logbakc配置文件吗?
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>


Re: flink json ddl解析

2020-08-31 文章 zilong xiao
like this:  ARRAY>>

Dream-底限  于2020年9月1日周二 上午11:40写道:

> hi
>
> 我正在解析json数组,在解析的时候遇到一个问题,当解析的json数组元素为同一类型的时候,我可以使用ddl的array进行存储,但是当json数组元素为不同类型的时候,我没办法做ddl映射,我查看JsonRowSchemaConverter解析json
> array的时候,对于不同类型的数组元素解析后可以用row存储,但请问我在ddl时候要怎么做,因为在DDL用row表示数组会抛出异常
>
>
> private static TypeInformation convertArray(String location,
> JsonNode node, JsonNode root) {
>// validate items
>if (!node.has(ITEMS)) {
>   throw new IllegalArgumentException(
>  "Arrays must specify an '" + ITEMS + "' property in node: " +
> location);
>}
>final JsonNode items = node.get(ITEMS);
>
>// list (translated to object array)
>if (items.isObject()) {
>   final TypeInformation elementType = convertType(
>  location + '/' + ITEMS,
>  items,
>  root);
>   // result type might either be ObjectArrayTypeInfo or
> BasicArrayTypeInfo for Strings
>   return Types.OBJECT_ARRAY(elementType);
>}
>// tuple (translated to row)
>else if (items.isArray()) {
>   final TypeInformation[] types = convertTypes(location + '/' +
> ITEMS, items, root);
>
>   // validate that array does not contain additional items
>   if (node.has(ADDITIONAL_ITEMS) &&
> node.get(ADDITIONAL_ITEMS).isBoolean() &&
> node.get(ADDITIONAL_ITEMS).asBoolean()) {
>  throw new IllegalArgumentException(
> "An array tuple must not allow additional items in node: "
> + location);
>   }
>
>   return Types.ROW(types);
>}
>throw new IllegalArgumentException(
>   "Invalid type for '" + ITEMS + "' property in node: " + location);
> }
>


Re: flink json ddl解析

2020-09-01 文章 zilong xiao
问题大概懂了,坐等Flink大佬回复

Dream-底限  于2020年9月1日周二 下午4:43写道:

> hi
> 就是json数组如果是这种:[1,2,3],我可以直接array解析
>
> 如果json数组是这种:[1,"test",true],如果我用array>程序是没办法运行的,如果我用array int,b string,c boolean>>,flink做ddl翻译解析json的时候会把row boolean>这一部分映射为解析jsonobject,但是array元素不是jsonobject会导致取不到数据
>
> zilong xiao  于2020年9月1日周二 下午4:04写道:
>
> > 基本类型包装一层会导致解析不出来  这个没太明白,可以举个列子吗?
> >
> > Dream-底限  于2020年9月1日周二 下午2:20写道:
> >
> > > hi、
> > >
> >
> 我先前也想这样用,但后来发现ddl中的row对应json中的object,基本类型包装一层会导致解析不出来,感觉应该在ddl加一个类型映射一下这种情况
> > >
> > > zilong xiao  于2020年9月1日周二 上午11:47写道:
> > >
> > > > like this:  ARRAY > > String>>>
> > > >
> > > > Dream-底限  于2020年9月1日周二 上午11:40写道:
> > > >
> > > > > hi
> > > > >
> > > > >
> > > >
> > >
> >
> 我正在解析json数组,在解析的时候遇到一个问题,当解析的json数组元素为同一类型的时候,我可以使用ddl的array进行存储,但是当json数组元素为不同类型的时候,我没办法做ddl映射,我查看JsonRowSchemaConverter解析json
> > > > > array的时候,对于不同类型的数组元素解析后可以用row存储,但请问我在ddl时候要怎么做,因为在DDL用row表示数组会抛出异常
> > > > >
> > > > >
> > > > > private static TypeInformation convertArray(String location,
> > > > > JsonNode node, JsonNode root) {
> > > > >// validate items
> > > > >if (!node.has(ITEMS)) {
> > > > >   throw new IllegalArgumentException(
> > > > >  "Arrays must specify an '" + ITEMS + "' property in node:
> "
> > +
> > > > > location);
> > > > >}
> > > > >final JsonNode items = node.get(ITEMS);
> > > > >
> > > > >// list (translated to object array)
> > > > >if (items.isObject()) {
> > > > >   final TypeInformation elementType = convertType(
> > > > >  location + '/' + ITEMS,
> > > > >  items,
> > > > >  root);
> > > > >   // result type might either be ObjectArrayTypeInfo or
> > > > > BasicArrayTypeInfo for Strings
> > > > >   return Types.OBJECT_ARRAY(elementType);
> > > > >}
> > > > >// tuple (translated to row)
> > > > >else if (items.isArray()) {
> > > > >   final TypeInformation[] types = convertTypes(location +
> '/'
> > +
> > > > > ITEMS, items, root);
> > > > >
> > > > >   // validate that array does not contain additional items
> > > > >   if (node.has(ADDITIONAL_ITEMS) &&
> > > > > node.get(ADDITIONAL_ITEMS).isBoolean() &&
> > > > > node.get(ADDITIONAL_ITEMS).asBoolean()) {
> > > > >  throw new IllegalArgumentException(
> > > > > "An array tuple must not allow additional items in
> node:
> > "
> > > > > + location);
> > > > >   }
> > > > >
> > > > >   return Types.ROW(types);
> > > > >}
> > > > >throw new IllegalArgumentException(
> > > > >   "Invalid type for '" + ITEMS + "' property in node: " +
> > > location);
> > > > > }
> > > > >
> > > >
> > >
> >
>


Flink Plan Visualizer

2020-09-08 文章 zilong xiao
hi,想问下,Flink Plan Visualizer能画job graph吗?网上查貌似只能根据execution plan画steaming
graph?


Re: Flink Plan Visualizer

2020-09-09 文章 zilong xiao
有可以画job graph的方法吗?

黄潇  于2020年9月8日周二 下午8:32写道:

> Hi,
>
> 据我所知,使用 env.getExecutionPlan() 得到的 json 字符串[1]只包含 stream graph
> 的信息,所以这样画出来的图是 stream graph。
> 在 job 提交之后的 web ui 中可以看到经过 operator chain 之后的图信息。
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/execution_plans.html
>
> zilong xiao  于2020年9月8日周二 下午8:00写道:
>
> > hi,想问下,Flink Plan Visualizer能画job graph吗?网上查貌似只能根据execution plan画steaming
> > graph?
> >
>


Re: RocksDBStateBackend 问题

2020-09-07 文章 zilong xiao
可以看下这个文档:
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#the-rocksdbstatebackend

guaishushu1...@163.com  于2020年9月7日周一 下午5:47写道:

> 想问下关于RocksDBStateBackend
> 是直接把状态存在rocksdb数据库,还是等内存满了再存到RocksDB数据库。如果直接存在RocksDB数据库,那岂不是很影响数据处理速度。
>
>
>
> guaishushu1...@163.com
>


Re: 如何在启动taskmanager时传入自定义的java参数

2020-09-15 文章 zilong xiao
可以在flink-conf.yaml里设置,例如:
env.java.opts: -Djob.name={{job_name}}

xiao cai  于2020年9月15日周二 下午5:46写道:

> Hi:
> 我修改了flink的一些源码,需要通过外部-Dkey=value的形式动态将值传入,试了下无法直接通过bin/flink run
> 后加-D的方式来添加,有什么好的办法吗?


Re: 请指教一个关于时间窗的问题,非常感谢!

2020-09-04 文章 zilong xiao
可否发下是哪个配置,有相关的文档吗?

superainbower  于2020年9月4日周五 下午5:24写道:

> 1.11的版本已经加入了 新的配置项,避免了数据倾斜导致某个分区没数据 而不触发计算的问题
>
>
> | |
> superainbower
> |
> |
> superainbo...@163.com
> |
> 签名由网易邮箱大师定制
>
>
> 在2020年09月4日 15:11,taochanglian 写道:
> 确实是这样,比如有你多个partition,但是只有1个partition里面有数据,wm就不会执行计算。需要保证多个parititon数据都有数据。
>
> 举个例子,我这里在做测试的时候,1个topic10个partition,由于测试环境,按照key
> hash,数据只进入1个partition,就不会触发计算。后来让这个key轮询到10个parition都有数据,就可以触发计算了。
>
> 在 2020/9/4 13:14, Benchao Li 写道:
> 如果你有多个partition,并且有一些partition没有数据,这个时候的确是会存在这个问题。
> 要处理这种情况,可以了解下idle source[1]
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamps_watermarks.html#dealing-with-idle-sources
>
> samuel@ubtrobot.com  于2020年9月3日周四 下午3:41写道:
>
> 补充一下环境信息:
>
> 有点类似以下问题:
> 在1.11版本测试flink sql时发现一个问题,用streaming api
> 消费kafka,使用eventtime,再把stream转table,进行sql聚合,发现当kafka topic是多个分区时,flink webui
> watermarks 显示No Watermark,聚合计算也迟迟不触发计算,但当kafka
> topic只有一个分区时却能这个正常触发计算,watermarks也显示正常。
>
> 不确定是否是因为kafka多分区引起的?
>
>
>
> 发件人: samuel@ubtrobot.com
> 发送时间: 2020-09-03 09:23
> 收件人: user-zh
> 主题: 回复: 回复:请指教一个关于时间窗的问题,非常感谢!
> 谢谢回复!
>
> 是Flink1.11.1的版本
>
> 以下是代码:
> package com.ubtechinc.dataplatform.flink.etl.exception.monitor;/*
> * Licensed to the Apache Software Foundation (ASF) under one
> * or more contributor license agreements.  See the NOTICE file
> * distributed with this work for additional information
> * regarding copyright ownership.  The ASF licenses this file
> * to you under the Apache License, Version 2.0 (the
> * "License"); you may not use this file except in compliance
> * with the License.  You may obtain a copy of the License at
> *
> * http://www.apache.org/licenses/LICENSE-2.0
> *
> * Unless required by applicable law or agreed to in writing, software
> * distributed under the License is distributed on an "AS IS" BASIS,
> * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> * See the License for the specific language governing permissions and
> * limitations under the License.
> */
>
> import org.apache.commons.collections.map.HashedMap;
> import org.apache.flink.api.common.eventtime.WatermarkStrategy;
> import org.apache.flink.api.common.functions.FlatMapFunction;
> import org.apache.flink.api.common.serialization.SimpleStringEncoder;
> import org.apache.flink.api.common.serialization.SimpleStringSchema;
> import org.apache.flink.api.common.state.BroadcastState;
> import org.apache.flink.api.common.state.MapStateDescriptor;
> import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
> import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.java.tuple.Tuple4;
> import org.apache.flink.api.java.utils.ParameterTool;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.core.fs.Path;
> import org.apache.flink.runtime.state.StateBackend;
> import org.apache.flink.runtime.state.filesystem.FsStateBackend;
> import org.apache.flink.streaming.api.CheckpointingMode;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.BroadcastStream;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import
> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import
>
> org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
> import org.apache.flink.streaming.api.functions.co
> .BroadcastProcessFunction;
> import
> org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
> import
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
> import
>
> org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
> import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
> import
> org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import org.apache.flink.util.Collector;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
> import com.alibaba.fastjson.JSON;
> import com.alibaba.fastjson.JSONArray;
> import com.alibaba.fastjson.JSONObject;
> import com.alibaba.fastjson.parser.Feature;
> import com.ubtechinc.dataplatform.flink.util.AES256;
> import com.ubtechinc.dataplatform.flink.util.ConstantStr;
> import com.ubtechinc.dataplatform.flink.util.MailUtils;
> import com.ubtechinc.dataplatform.flink.util.SmsUtil;
> import com.ubtechinc.dataplatform.flink.util.YearMonthDayBucketAssigner;
>
> import java.sql.DriverManager;
> import java.sql.PreparedStatement;
> import java.sql.ResultSet;
> import com.mysql.jdbc.Connection;

Re: 无法savepoint

2020-09-02 文章 zilong xiao
看官方文档 cancel 语法格式是:Syntax: cancel [OPTIONS] ,所以-yid xxx是不是要放到job
id之前? 另外文档中有提示到Cancel with a savepoint (deprecated), 建议使用stop语法,见:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/cli.html

希望对你有帮助,祝好~

x <35907...@qq.com> 于2020年9月3日周四 上午11:30写道:

> /flink/flink-1.10.1/bin/flink cancel -s
> hdfs://nameservice1/user/flink_1.10.1/flink-savepoints
> f95e51fd12a83907b6ca8c52eb2614ae -yid application_1583831804206_1106301
> 提示Unrecognized option: -yid


Flink SQL 1.11如何获取执行计划 & StreamGraph

2020-10-16 文章 zilong xiao
1.11好像改了接口,用StreamExecutionEnvironment.getExecutionPlan()会报"No operators
defined in streaming topology. Cannot execute." 1.10是可以正常执行的


Re: Flink SQL 1.11如何获取执行计划 & StreamGraph

2020-10-17 文章 zilong xiao
如题所说,以下是我个人分析,希望社区大佬帮忙看看是否正确,根据源码 遇到No
operators...异常的原因是transformations为空,在1.10版本,transformations的赋值动作(addOperator方法)在sqlUpdate()调用时触发,在1.11版本移除掉了该方法,transformations的赋值动作在statementSet.execute()时才会执行,是不是可以理解为transformations不会提前赋值,只有提交时才会初始化,所以在作业提交前无法获取作业的执行计划,我觉得这个是不是不太满足用户使用场景,不知道1.11这么做是有什么取舍吗?

zilong xiao  于2020年10月16日周五 下午8:56写道:

> 1.11好像改了接口,用StreamExecutionEnvironment.getExecutionPlan()会报"No operators
> defined in streaming topology. Cannot execute." 1.10是可以正常执行的
>


Re: flink1.10 stop with a savepoint失败

2020-10-19 文章 zilong xiao
Hi Robin Zhang
你应该是遇到了这个issue报告的问题:https://issues.apache.org/jira/browse/FLINK-16626
,可以看下这个issue描述,祝好~

Robin Zhang  于2020年10月19日周一 下午3:42写道:

> 普通的source -> map -> filter-> sink 测试应用。
>
> 触发savepoint的脚本 :
> ${FLINK_HOME} stop -p ${TARGET_DIR} -d ${JOB_ID}
> 具体报错信息:
>
> org.apache.flink.util.FlinkException: Could not stop with a savepoint job
> "81990282a4686ebda3d04041e3620776".
> at
> org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:462)
> at
>
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
> at
> org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:454)
> at
>
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:907)
> at
>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1962)
> at
>
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> Caused by: java.util.concurrent.TimeoutException
> at
>
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
> at
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:460)
> ... 9 more
>
>
>
> 查看报错,怀疑是权限问题,我是root用户启动的应用,savepoint目录所在的hdfs路径权限所属也是root,如果不停止应用,直接触发savepoint没问题,继续定位到是root用户去停止hadoop
> 应用遇到权限问题,但是不知道怎么解决,目前卡在这里。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: 关于flink任务的日志收集到kafka,可以在logback配置文件中,加如每个job的id或者name吗?

2020-08-25 文章 zilong xiao
1:想加入跟业务相关的字段,例如jobId,jobName,可以继承PatternLayout,重写doLayout即可,在方法中对日志进行填充
2:这些属性有办法可以从环境变量中获取

Jim Chen  于2020年8月25日周二 下午4:49写道:

> 大家好:
> 我们在做flink的日志收集到kafak时,使用的logback日志配置文件,目前的pattern是%d{-MM-dd
> HH:mm:ss.SSS} [%thread] %-5level %logger{60} -
> %msg,有没有什么办法在里面加入每个job的id,name或者tasknamanger的主机名之类的信息啊。在做ELK的时候,方便查询。
> 这个配置文件,是整个项目的,是基于Yarn的per job模式,难道每个主类打包的时候,都要改动不同的logbakc配置文件吗?
>


Re: flink1.11 sql问题

2020-08-25 文章 zilong xiao
直接CAST不可以吗?

酷酷的浑蛋  于2020年8月25日周二 下午3:46写道:

> 关键是那个值不是固定的,有时候是json,有时候是json数组,没办法固定写一个,现在我只想把value当做字符串获取到,难道没有办法吗
>
>
>
>
> 在2020年08月25日 15:34,taochanglian 写道:
> flinksql,处理json ,对象的话用row,数组的话用array获取具体的值。
>
> 在 2020/8/25 14:59, 酷酷的浑蛋 写道:
> 还是这个问题,如果字段的值有时候是json有时候是json数组,那么我只想把它当做字符串显示,该怎么写?
>
>
>
>
> 在2020年08月25日 14:05,酷酷的浑蛋 写道:
> 我知道了
>
>
>
>
> 在2020年08月25日 13:58,酷酷的浑蛋 写道:
>
>
>
>
> flink1.11
> 读取json数据时format=“json”,当数据中某个字段的值是[{"a1":{"a2":"v2"}}]类似这种嵌套,flink取到的值就是空,这个怎么处理?
>


Re: Flink SQL Map类型字段大小写不敏感支持

2020-08-26 文章 zilong xiao
这个有相关的issue可以follow吗?

Danny Chan  于2020年8月26日周三 下午8:42写道:

> 您好 现在 Flink SQL 是大小写敏感的 目前还没有计划开启大小写不敏感。
>
> Best,
> Danny Chan
> 在 2020年8月21日 +0800 AM11:04,zilong xiao ,写道:
> > 如题,在业务中有遇到过在Map类型字段中有表示同一个含义但是大小写不一致的情况,比如有个Map字段 my_map,里面可能存在key
> > aB,ab,Ab,在SQL中取值时能否能做到大小写不敏感呢,my_map['ab'] 能取所有相关key的值
>


Re: 如何设置FlinkSQL并行度

2020-08-29 文章 zilong xiao
SQL 算子并行度设置可以自己实现,可以私下交流下,正好在做这块,基本能工作了

JasonLee <17610775...@163.com> 于2020年8月23日周日 下午2:07写道:

> hi
> checkpoint savepoint的问题可以看下这个
> https://mp.weixin.qq.com/s/Vl6_GsGeG0dK84p9H2Ld0Q
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Flink SQL Map类型字段大小写不敏感支持

2020-08-20 文章 zilong xiao
如题,在业务中有遇到过在Map类型字段中有表示同一个含义但是大小写不一致的情况,比如有个Map字段 my_map,里面可能存在key
aB,ab,Ab,在SQL中取值时能否能做到大小写不敏感呢,my_map['ab'] 能取所有相关key的值


Re: Flink 启动问题

2020-08-20 文章 zilong xiao
-yt应该只能写一个目录,你有什么痛点呢?

guaishushu1...@163.com  于2020年8月20日周四 下午8:40写道:

>  大佬们知道 flink 的-yt命令是不支持多个目录吗,而且只能上传到集群.jar文件吗???
>
>
>
> guaishushu1...@163.com
>


Re: 关于flink任务的日志收集到kafka,可以在logback配置文件中,加如每个job的id或者name吗?

2020-08-27 文章 zilong xiao
如果是用CLI方式提交作业的话是可以做到的

Jim Chen  于2020年8月27日周四 下午6:13写道:

> 如果是自动以PatternLayout的话,我有几点疑问:
>
> 1、logback加载时机的问题,就是①先会运行logback相关类,②再执行你自定义的PatternLayout,③再去执行你的主类,在②的时候,此时还没法确定具体的启动类是啥,这种方式没法根据job动态变化
>
> 如果使用env的话
> 1、配置环境变量的话,如果yarn有10个节点。那么每台是不是都要配置一下
> 2、因为是每个job都要传递,所以,这个应该是临时的环境变量吧
> 3、如果是配置的临时环境变量的话,那么在执行bin/flink run的时候,shell中是执行java
> -cp的,此时的主类,是org.apache.flink.client.cli.CliFrontend,这种方式,环境变量在传递的时候,会丢吧?
>
> zilong xiao  于2020年8月25日周二 下午5:32写道:
>
> > 1:想加入跟业务相关的字段,例如jobId,jobName,可以继承PatternLayout,重写doLayout即可,在方法中对日志进行填充
> > 2:这些属性有办法可以从环境变量中获取
> >
> > Jim Chen  于2020年8月25日周二 下午4:49写道:
> >
> > > 大家好:
> > > 我们在做flink的日志收集到kafak时,使用的logback日志配置文件,目前的pattern是%d{-MM-dd
> > > HH:mm:ss.SSS} [%thread] %-5level %logger{60} -
> > > %msg,有没有什么办法在里面加入每个job的id,name或者tasknamanger的主机名之类的信息啊。在做ELK的时候,方便查询。
> > > 这个配置文件,是整个项目的,是基于Yarn的per job模式,难道每个主类打包的时候,都要改动不同的logbakc配置文件吗?
> > >
> >
>


Re: Flink SQL Map类型字段大小写不敏感支持

2020-08-26 文章 zilong xiao
好的,了解了,谢谢啦~

Leonard Xu  于2020年8月26日周三 下午9:26写道:

> Hi,zilong
>
> 之前我建了一个issue[1]支持大小写敏感, 有了个初步的PR,但是社区想做全套,字段名,表名,catalog名都统一解决,所以还没支持
>
> 祝好
> Leonard
> [1] https://issues.apache.org/jira/browse/FLINK-16175?filter=12347488 <
> https://issues.apache.org/jira/browse/FLINK-16175?filter=12347488>
>
> > 在 2020年8月26日,20:47,zilong xiao  写道:
> >
> > 这个有相关的issue可以follow吗?
> >
> > Danny Chan  于2020年8月26日周三 下午8:42写道:
> >
> >> 您好 现在 Flink SQL 是大小写敏感的 目前还没有计划开启大小写不敏感。
> >>
> >> Best,
> >> Danny Chan
> >> 在 2020年8月21日 +0800 AM11:04,zilong xiao ,写道:
> >>> 如题,在业务中有遇到过在Map类型字段中有表示同一个含义但是大小写不一致的情况,比如有个Map字段 my_map,里面可能存在key
> >>> aB,ab,Ab,在SQL中取值时能否能做到大小写不敏感呢,my_map['ab'] 能取所有相关key的值
> >>
>
>


Re: 关于flink任务的日志收集到kafka,可以在logback配置文件中,加如每个job的id或者name吗?

2020-08-27 文章 zilong xiao
想确认下你是用什么方式提交作业呢?是CLI吗?

Jim Chen  于2020年8月27日周四 下午6:13写道:

> 如果是自动以PatternLayout的话,我有几点疑问:
>
> 1、logback加载时机的问题,就是①先会运行logback相关类,②再执行你自定义的PatternLayout,③再去执行你的主类,在②的时候,此时还没法确定具体的启动类是啥,这种方式没法根据job动态变化
>
> 如果使用env的话
> 1、配置环境变量的话,如果yarn有10个节点。那么每台是不是都要配置一下
> 2、因为是每个job都要传递,所以,这个应该是临时的环境变量吧
> 3、如果是配置的临时环境变量的话,那么在执行bin/flink run的时候,shell中是执行java
> -cp的,此时的主类,是org.apache.flink.client.cli.CliFrontend,这种方式,环境变量在传递的时候,会丢吧?
>
> zilong xiao  于2020年8月25日周二 下午5:32写道:
>
> > 1:想加入跟业务相关的字段,例如jobId,jobName,可以继承PatternLayout,重写doLayout即可,在方法中对日志进行填充
> > 2:这些属性有办法可以从环境变量中获取
> >
> > Jim Chen  于2020年8月25日周二 下午4:49写道:
> >
> > > 大家好:
> > > 我们在做flink的日志收集到kafak时,使用的logback日志配置文件,目前的pattern是%d{-MM-dd
> > > HH:mm:ss.SSS} [%thread] %-5level %logger{60} -
> > > %msg,有没有什么办法在里面加入每个job的id,name或者tasknamanger的主机名之类的信息啊。在做ELK的时候,方便查询。
> > > 这个配置文件,是整个项目的,是基于Yarn的per job模式,难道每个主类打包的时候,都要改动不同的logbakc配置文件吗?
> > >
> >
>


Re: Flink SQL Map类型字段大小写不敏感支持

2020-08-27 文章 zilong xiao
是的,就是希望能忽略大小写,有尝试过用UDF去支持忽略大小写,但是取值的效率会比较低,然后脑洞比较大,跑来社区问问未来有没有可能在定义表DDL时,提供一个可选配置,用于开关是否忽略大小写,不过感觉这个可能不太好搞吧,自己的一个小脑洞而已,还是非常感谢几位大佬的回复~

Jark Wu  于2020年8月27日周四 下午8:01写道:

> 额,我觉得楼上的理解错楼主的意思了吧。
>
> 如果我理解的没错,楼主的意思是取 MAP 中的值的时候,key 能忽略大小写。 比如 my_map['ab'] 能取到  'aB', 'Ab'
> 的数据。
>
> 我觉得这个需求有点违反 map 的行为了,在所有的编程语言中,map 的 key 都是只能对应唯一一个 value 的,大小写要严格匹配的。
> 如果想要实现这种效果,你可以先将原先的 map 转成小写后的 key,value 为原先 'aB', 'Ab' ...  的 value list。
>
>
> Best,
> Jark
>
>
>
>
>
> On Thu, 27 Aug 2020 at 09:56, zilong xiao  wrote:
>
> > 好的,了解了,谢谢啦~
> >
> > Leonard Xu  于2020年8月26日周三 下午9:26写道:
> >
> > > Hi,zilong
> > >
> > > 之前我建了一个issue[1]支持大小写敏感, 有了个初步的PR,但是社区想做全套,字段名,表名,catalog名都统一解决,所以还没支持
> > >
> > > 祝好
> > > Leonard
> > > [1] https://issues.apache.org/jira/browse/FLINK-16175?filter=12347488
> <
> > > https://issues.apache.org/jira/browse/FLINK-16175?filter=12347488>
> > >
> > > > 在 2020年8月26日,20:47,zilong xiao  写道:
> > > >
> > > > 这个有相关的issue可以follow吗?
> > > >
> > > > Danny Chan  于2020年8月26日周三 下午8:42写道:
> > > >
> > > >> 您好 现在 Flink SQL 是大小写敏感的 目前还没有计划开启大小写不敏感。
> > > >>
> > > >> Best,
> > > >> Danny Chan
> > > >> 在 2020年8月21日 +0800 AM11:04,zilong xiao ,写道:
> > > >>> 如题,在业务中有遇到过在Map类型字段中有表示同一个含义但是大小写不一致的情况,比如有个Map字段 my_map,里面可能存在key
> > > >>> aB,ab,Ab,在SQL中取值时能否能做到大小写不敏感呢,my_map['ab'] 能取所有相关key的值
> > > >>
> > >
> > >
> >
>


Re: flink on yarn日志问题

2020-08-27 文章 zilong xiao
如果是用命令行的方式提交作业,可以在环境变量中获取APP IP,在作业以pre job方式提交到集群时,会执行 launch_container.sh

,里面export了很多变量,其中就有_APP_ID

Jim Chen  于2020年8月27日周四 下午6:17写道:

> 能分享一下demo吗?
>
> Cayden chen <1193216...@qq.com> 于2020年7月15日周三 下午2:56写道:
>
> > 我们的获取逻辑是通过定义
> >
> logback的appder,appder通过解析当前系统路径(因为flink每个taskmanager会自己定义一个带有applicationId的路径,然后里面会放各种jar包,包括我自定义的appder),获取之后通过MDC.put(),给日志加一列appId,在appder里面把日志上报到外部的日志系统
> >
> >
> >
> >
> > --原始邮件--
> > 发件人:
> >   "user-zh"
> > <
> > rjia...@163.com;
> > 发送时间:2020年7月14日(星期二) 下午5:31
> > 收件人:"user-zh@flink.apache.org" >
> > 主题:回复:   flink on yarn日志问题
> >
> >
> >
> >
> >
> > 我们获取运行Yarn日志逻辑大概是这样的:
> > 1.
> >
> 访问rm-address/ws/v1/cluster/apps/applicationId,拿到amContainerLog中的url即为jm的url.
> > 2.
> >
> taskmanager日志url通过rm-address/proxy/applicationId/taskmanagers,拿到所有taskmanager的基本信息,替换amContainerLog中的containername和ip。
> >
> >
> > 日志比较大时:指定读取的字节开始和结束位置,url?start=0end=1024
> > | |
> > jianxu
> > |
> > |
> > rjia...@163.com
> > |
> > 在2020年07月14日 17:07,Cayden chen<1193216...@qq.com 写道:
> > 有个问题,如何区分日志是哪个任务的呢
> >
> >
> >
> >
> > --nbsp;原始邮件nbsp;--
> >
> 发件人:
> >
> "user-zh"
> >  > 发送时间:nbsp;2020年7月14日(星期二) 下午5:05
> > 收件人:nbsp;"user-zh" >
> > 主题:nbsp;Re: Re: Re: flink on yarn日志问题
> >
> >
> >
> > Flink在1.11开始默认使用log4j2, log4j2已经有了很多appender[1]可以用来将日志输出到外部系统或服务。
> >
> > [1] https://logging.apache.org/log4j/2.x/manual/appenders.html
> >
> > Best,
> > Yangze Guo
> >
> > On Tue, Jul 14, 2020 at 4:46 PM nicygan  > gt;
> > gt; 是有这个毛病,看TM日志不方便。
> > gt;
> > gt; 而且本地日志过几小时就会被清理,时间一久就看不到了,只剩JM日志。
> > gt;
> > gt;
> > gt;
> > gt;
> > gt;
> > gt;
> > gt; 在 2020-07-14 12:35:06,"zhisheng"  gt;
> > 写道:
> > gt; gt;知道 YARN 的 applicationId,应该也可以去 HDFS 找对应的 taskmanager
> > 的日志(可以拼出路径),然后复制到本地去查看
> > gt; gt;
> > gt; gt;Yangze Guo  > 上午11:58写道:
> > gt; gt;
> > gt; gt;gt; Hi, 王松
> > gt; gt;gt;
> > gt; gt;gt; 我理解拼接url就可以了,不用实际去登陆机器然后进到对应目录。
> > gt; gt;gt;
> > gt; gt;gt; Best,
> > gt; gt;gt; Yangze Guo
> > gt; gt;gt;
> > gt; gt;gt; On Tue, Jul 14, 2020 at 8:26 AM 王松 <
> > sdlcwangson...@gmail.comgt; wrote:
> > gt; gt;gt; gt;
> > gt; gt;gt; gt; 我们也有问题 1,和 Yangze Guo
> > 说的一样,每次都要去对应的tm目录中去找日志,很麻烦,不知道有没有更简单的办法。
> > gt; gt;gt; gt;
> > gt; gt;gt; gt; Yangze Guo  gt;
> > 于2020年7月13日周一 下午5:03写道:
> > gt; gt;gt; gt;
> > gt; gt;gt; gt; gt; 1.
> > gt; gt;gt; gt; gt;
> > gt; gt;gt;
> >
> 我验证了一下,如果开启了日志收集,那tm的日志是会保存的,但是你整个application结束前可能看不到,有一个trick的方法,首先在jm日志中找到tm分配到了哪个NodeManager上,通过拼接url的方式来获取container的日志
> > gt; gt;gt; gt; gt; 2. 你是否需要调整一下重启策略[1]?
> > 如果开启了ck,默认情况下就会一直尝试重启job
> > gt; gt;gt; gt; gt;
> > gt; gt;gt; gt; gt; [1]
> > gt; gt;gt; gt; gt;
> > gt; gt;gt;
> >
> https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/task_failure_recovery.html
> > gt; gt;gt; gt; gt;
> > gt; gt;gt; gt; gt; Best,
> > gt; gt;gt; gt; gt; Yangze Guo
> > gt; gt;gt; gt; gt;
> > gt; gt;gt; gt; gt;
> > gt; gt;gt; gt; gt; On Mon, Jul 13, 2020 at 2:40
> > PM 程龙 <13162790...@163.comgt; wrote:
> > gt; gt;gt; gt; gt; gt;
> > gt; gt;gt; gt; gt; gt; 不好意思nbsp;
> > 怪我灭有描述清楚
> > gt; gt;gt; gt; gt; gt; 1 目前开启日志收集功能
> > gt; gt;gt; gt; gt; gt; 2 目前已是 per-job模式
> > gt; gt;gt; gt; gt; gt; 3 集群使用cdh flink.1.10
> > gt; gt;gt; gt; gt; gt;
> > gt; gt;gt; gt; gt; gt;
> > gt; gt;gt; gt; gt; gt;
> > gt; gt;gt; gt; gt; gt;
> > gt; gt;gt; gt; gt; gt;
> > gt; gt;gt; gt; gt; gt;
> > gt; gt;gt; gt; gt; gt;
> > gt; gt;gt; gt; gt; gt;
> > gt; gt;gt; gt; gt; gt;
> > gt; gt;gt; gt; gt; gt;
> > gt; gt;gt; gt; gt; gt;
> > gt; gt;gt; gt; gt; gt;
> > gt; gt;gt; gt; gt; gt;
> > gt; gt;gt; gt; gt; gt;
> > gt; gt;gt; gt; gt; gt; 在 2020-07-13
> > 11:18:46,"Yangze Guo"  > gt; gt;gt; gt; gt; gt; gt;Hi,
> > gt; gt;gt; gt; gt; gt; gt;
> > gt; gt;gt; gt; gt; gt;
> > gt;第一个问题,您可以尝试开启Yarn的日志收集功能[1]
> > gt; gt;gt; gt; gt; gt; gt;
> > gt; gt;gt; gt; gt; gt;
> > gt;第二个问题,您可以尝试一下per-job mode [2][3]
> > gt; gt;gt; gt; gt; gt; gt;
> > gt; gt;gt; gt; gt; gt; gt;[1]
> > gt; gt;gt; gt; gt;
> > gt; gt;gt;
> >
> https://ci.apache.org/projects/flink/flink-docs-master/zh/ops/deployment/yarn_setup.html#log-files
> > gt
> > <
> https://ci.apache.org/projects/flink/flink-docs-master/zh/ops/deployment/yarn_setup.html#log-filesgt
> >;
> > gt;gt; gt; gt; gt; gt;[2]
> > gt; gt;gt; gt; gt;
> > gt; gt;gt;
> >
> https://ci.apache.org/projects/flink/flink-docs-master/zh/ops/deployment/#per-job-mode
> > gt
> > <
> https://ci.apache.org/projects/flink/flink-docs-master/zh/ops/deployment/#per-job-modegt
> >;
> > gt;gt; gt; gt; gt; gt;[3]
> > gt; gt;gt; gt; gt;
> > gt; gt;gt;
> >
> https://ci.apache.org/projects/flink/flink-docs-master/zh/ops/deployment/yarn_setup.html#run-a-single-flink-job-on-yarn
> > gt
> > <
> 

Re: 从savepoint 启动以后,无法在checkpoint页面看到last restore的相关信息

2020-08-27 文章 zilong xiao
图挂了,用图床工具贴链接吧

范超  于2020年8月28日周五 上午11:37写道:

> Hi, 大家好
>
> Flink版本 1.10.0
>
>
>
> 目前程序的checkpoint使用rocksdb的方式存储在hdfs上,在sink失败的时候能够正常从上一个checkpoint点恢复。
>
> 问题是由于升级程序,我使用了命令行
>
> *bin/flink stop -p ${hdfsSavepointDir} -d $runningJobId -yid $yarnAppId*
>
>
>
> 将savepoint文件保存,然后再使用保存的savepoint来启动程序
>
> */bin/flink run -d -m yarn-cluster -p ${parallelism} -yjm ${jm} -ytm ${tm}
> $fullJarPath -s $savePointFullPath –c xxx*
>
>
>
> 比较无法理解的是,jm和tm日志都显示成功启动,但是无法看到从checkpoint恢复的记录如下图所示:
>
>
>
> 有知道的大佬知道是不是我哪里处理不正常么?
>


Re: 编译Flink时找不到scala-maven-plugin:3.1.4

2020-09-23 文章 zilong xiao
Hi Natasha,
在mvn命令中加上这两个参数试试看 -Dscala-2.12 -Pscala-2.12

Natasha <13631230...@163.com> 于2020年9月23日周三 下午4:00写道:

> Hi All,
> 很高兴加入Flink这个大家庭!但是有个问题困扰了我好久!
> 当我导入Flink到IDEA中准备进行编译,输入“mvn clean install -Drat.skip=true
> -Dmaven.test.skip=true -Dmaven.javadoc.skip=true -Dcheckstyle.skip=true”后,
> 报错“Failed to execute goal
> net.alchim31.maven:scala-maven-plugin:3.1.4:testCompile
> (scala-test-compile) on project flink-runtime_2.10”,
>
> 我尝试了网上的方法,修改pom.xml文件中scala-maven-plugin的版本,或者是让IDEA的Scala版本与Windows的Scala版本保持一致,但是都不起作用!
>
>
>
>
> Best,
> Natasha
>
>
> | |
> Natasha
> |
> |
> |
> 签名由网易邮箱大师定制


Re: Submit New Job -> Show Plan源码

2020-09-25 文章 zilong xiao
在工程里全局搜索关键字找到的代码貌似不是这个功能的,想知道这个接口调用的入口类类名是哪个呢?

zilong xiao  于2020年9月25日周五 下午3:38写道:

> [image: image.png]
> Hi, 想问下社区有大佬知道Show Plan调用接口的源码在哪块吗?
>


Re: flink读取mongo数据本地部署成功,flink-cluster部署找不到类

2020-09-25 文章 zilong xiao
有可能是依赖冲突了,可以尝试用maven shade jar看看

GeGe  于2020年9月25日周五 下午3:54写道:

>
>
> 您好!
>
>
> 我用flink从mongo读取数据,在本地运行成功,但是部署到本地flink-cluster中却报错:
>
>
>
> java.lang.NoClassDefFoundError: com/mongodb/MongoClient
>
> at java.lang.Class.getDeclaredMethods0(Native Method)
>
> at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
>
> at java.lang.Class.getDeclaredMethod(Class.java:2128)
>
> at
> org.apache.flink.api.java.ClosureCleaner.usesCustomSerialization(ClosureCleaner.java:164)
>
> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:89)
>
> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
>
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1901)
>
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1614)
>
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1571)
>
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1553)
>
> at
> com.test.flink.service.MongoFlinkMainService.main(MongoFlinkMainService.java:23)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:498)
>
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
>
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
>
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
>
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
>
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
>
> at
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
>
> Caused by: java.lang.ClassNotFoundException: com.mongodb.MongoClient
>
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>
> at
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
>
> at
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>
> ... 24 more
>
>
>
>
>
> 之前得到的数据是org.bson.Document格式,但是报错:找不到Document,所以就换成了String类型。现在报错的类也是在同一个jar包下:mongo-driver-java-3.8.2.jar
> ,换了各种版本,解压maven的jar包查看也有这个jar包。现在就是不知道为什么找不到这个jar包下的任何类。
>
>
> 请求各位大神帮帮忙~~非常非常感谢~~
>
>
> Best wishes,
>
>
> Gege
> | |
> Ge Ge
> Software Engineer
> |
> |
> gg13871077...@163.com
> |
> 签名由网易邮箱大师定制
>
>


Re: 怎么样在Flink中使用java代码提交job到yarn

2020-09-25 文章 zilong xiao
JobId是说Flink的JobId还是yarn上面的application ID呢?containerID可以通过yarn rest api拿到呀

xiao cai  于2020年9月25日周五 下午4:43写道:

>
> 使用process没办法拿到当前提交任务的JobId和提交到Yarn上的对应的containerId,自能从标准输出中使用正则解析出来。我们目前使用的就是这种方式,已经实现了。但是这样很不优雅,而且提交的速度也会比较慢。
>
>
>  原始邮件
> 发件人: zilong xiao
> 收件人: user-zh
> 发送时间: 2020年9月25日(周五) 16:32
> 主题: Re: 怎么样在Flink中使用java代码提交job到yarn
>
>
> Java程序用process调用脚本提交任务没啥问题吧,获取jobId的问题,我理解可以用yarn rest api &
> flink rest api来完成,希望对你有帮助,祝好~ xiao cai  于2020年9月25日周五
> 下午4:23写道: > Hi all: > 大家好,我目前遇到一个flink 任务提交方面的困扰: >
> 想要在自己的项目中(springboot)提交flink >
> job到yarn上。目前采用的方式是process类在本地起进程,调用bin/flink来启动。这样不是很优雅,而且比较难获取到提交的jobId和containerId。看到网上有博客使用ClusterClient的方式来提交,但是遇到了classpath的问题,会缺失一些FLINK_HOME/lib下的jar包
> > 所以我想问,应该如何在自己的应用中提交任务到yarn,既能拿到任务信息,又可以解决classpath为指定带来的困扰。 > 非常感谢 > > >
> best, > xiao


Re: 怎么样在Flink中使用java代码提交job到yarn

2020-09-25 文章 zilong xiao
我知道呀,你不是从输出里抠的嘛?你想要更优雅的方式可以用rest api啊,这些信息都是可以拿到的

xiao cai  于2020年9月25日周五 下午4:53写道:

> hi zilong:
> 你说的这些信息我目前都能拿到,已经实现了。只是实现的方式很不优雅,而且提交的速度很慢。
>
>
>  原始邮件
> 发件人: zilong xiao
> 收件人: user-zh
> 发送时间: 2020年9月25日(周五) 16:48
> 主题: Re: 怎么样在Flink中使用java代码提交job到yarn
>
>
> JobId是说Flink的JobId还是yarn上面的application ID呢?containerID可以通过yarn rest api拿到呀
> xiao cai  于2020年9月25日周五 下午4:43写道: > >
> 使用process没办法拿到当前提交任务的JobId和提交到Yarn上的对应的containerId,自能从标准输出中使用正则解析出来。我们目前使用的就是这种方式,已经实现了。但是这样很不优雅,而且提交的速度也会比较慢。
> > > > 原始邮件 > 发件人: zilong xiao > 收件人: user-zh<
> user-zh@flink.apache.org> > 发送时间: 2020年9月25日(周五) 16:32 > 主题: Re:
> 怎么样在Flink中使用java代码提交job到yarn > > >
> Java程序用process调用脚本提交任务没啥问题吧,获取jobId的问题,我理解可以用yarn rest api & >
> flink rest api来完成,希望对你有帮助,祝好~ xiao cai  于2020年9月25日周五 >
> 下午4:23写道: > Hi all: > 大家好,我目前遇到一个flink 任务提交方面的困扰: > >
> 想要在自己的项目中(springboot)提交flink > >
> job到yarn上。目前采用的方式是process类在本地起进程,调用bin/flink来启动。这样不是很优雅,而且比较难获取到提交的jobId和containerId。看到网上有博客使用ClusterClient的方式来提交,但是遇到了classpath的问题,会缺失一些FLINK_HOME/lib下的jar包
> > > 所以我想问,应该如何在自己的应用中提交任务到yarn,既能拿到任务信息,又可以解决classpath为指定带来的困扰。 > 非常感谢 > >
> > > best, > xiao


Re: 怎么样在Flink中使用java代码提交job到yarn

2020-09-25 文章 zilong xiao
你提交的任务是可以指定job name的呀,你的job name和你的业务主键绑定就可以做到唯一了,然后根据这个关系查询即可,没记错-ynm
是指定job name的

xiao cai  于2020年9月25日周五 下午5:01写道:

> hi zilong:
> 通过process提交任务以后,通过rest
> api,如何知道哪一个是我提交的呢?如果这时有多个请求过来同时都提交了任务,怎么知道rest返回的application应该与哪一次提交对应呢?
>
>
>  原始邮件
> 发件人: zilong xiao
> 收件人: user-zh
> 发送时间: 2020年9月25日(周五) 16:55
> 主题: Re: 怎么样在Flink中使用java代码提交job到yarn
>
>
> 我知道呀,你不是从输出里抠的嘛?你想要更优雅的方式可以用rest api啊,这些信息都是可以拿到的 xiao cai <
> flin...@163.com> 于2020年9月25日周五 下午4:53写道: > hi zilong: >
> 你说的这些信息我目前都能拿到,已经实现了。只是实现的方式很不优雅,而且提交的速度很慢。 > > > 原始邮件 > 发件人: zilong xiao<
> acidzz...@gmail.com> > 收件人: user-zh > 发送时间:
> 2020年9月25日(周五) 16:48 > 主题: Re: 怎么样在Flink中使用java代码提交job到yarn > > >
> JobId是说Flink的JobId还是yarn上面的application ID呢?containerID可以通过yarn rest api拿到呀
> > xiao cai  于2020年9月25日周五 下午4:43写道: > > >
> 使用process没办法拿到当前提交任务的JobId和提交到Yarn上的对应的containerId,自能从标准输出中使用正则解析出来。我们目前使用的就是这种方式,已经实现了。但是这样很不优雅,而且提交的速度也会比较慢。
> > > > > 原始邮件 > 发件人: zilong xiao > 收件人: user-zh< >
> user-zh@flink.apache.org> > 发送时间: 2020年9月25日(周五) 16:32 > 主题: Re: >
> 怎么样在Flink中使用java代码提交job到yarn > > > >
> Java程序用process调用脚本提交任务没啥问题吧,获取jobId的问题,我理解可以用yarn rest api & >
> > flink rest api来完成,希望对你有帮助,祝好~ xiao cai  于2020年9月25日周五
> > > 下午4:23写道: > Hi all: > 大家好,我目前遇到一个flink 任务提交方面的困扰: > > >
> 想要在自己的项目中(springboot)提交flink > > >
> job到yarn上。目前采用的方式是process类在本地起进程,调用bin/flink来启动。这样不是很优雅,而且比较难获取到提交的jobId和containerId。看到网上有博客使用ClusterClient的方式来提交,但是遇到了classpath的问题,会缺失一些FLINK_HOME/lib下的jar包
> > > > 所以我想问,应该如何在自己的应用中提交任务到yarn,既能拿到任务信息,又可以解决classpath为指定带来的困扰。 > 非常感谢 >
> > > > > best, > xiao


Re: 怎么样在Flink中使用java代码提交job到yarn

2020-09-25 文章 zilong xiao
不太了解 YarnClusterDescripto~

xiao cai  于2020年9月25日周五 下午5:28写道:

> Hi zilong:
>
> 这种方式我考虑过,个人认为平台层面如果有业务逻辑的侵入,会影响后续的升级。所以我们是在标注输出中正则匹配出jobId和applicationId。你了解YarnClusterDescripto吗?之前社区看到有人用这个提交的。
>
>
>  原始邮件
> 发件人: zilong xiao
> 收件人: user-zh
> 发送时间: 2020年9月25日(周五) 17:12
> 主题: Re: 怎么样在Flink中使用java代码提交job到yarn
>
>
> 你提交的任务是可以指定job name的呀,你的job name和你的业务主键绑定就可以做到唯一了,然后根据这个关系查询即可,没记错-ynm
> 是指定job name的 xiao cai  于2020年9月25日周五 下午5:01写道: > hi
> zilong: > 通过process提交任务以后,通过rest >
> api,如何知道哪一个是我提交的呢?如果这时有多个请求过来同时都提交了任务,怎么知道rest返回的application应该与哪一次提交对应呢? >
> > > 原始邮件 > 发件人: zilong xiao > 收件人: user-zh<
> user-zh@flink.apache.org> > 发送时间: 2020年9月25日(周五) 16:55 > 主题: Re:
> 怎么样在Flink中使用java代码提交job到yarn > > > 我知道呀,你不是从输出里抠的嘛?你想要更优雅的方式可以用rest
> api啊,这些信息都是可以拿到的 xiao cai < > flin...@163.com> 于2020年9月25日周五 下午4:53写道: >
> hi zilong: > > 你说的这些信息我目前都能拿到,已经实现了。只是实现的方式很不优雅,而且提交的速度很慢。 > > > 原始邮件 >
> 发件人: zilong xiao< > acidzz...@gmail.com> > 收件人: user-zh<
> user-zh@flink.apache.org> > 发送时间: > 2020年9月25日(周五) 16:48 > 主题: Re:
> 怎么样在Flink中使用java代码提交job到yarn > > > > JobId是说Flink的JobId还是yarn上面的application
> ID呢?containerID可以通过yarn rest api拿到呀 > > xiao cai 
> 于2020年9月25日周五 下午4:43写道: > > > >
> 使用process没办法拿到当前提交任务的JobId和提交到Yarn上的对应的containerId,自能从标准输出中使用正则解析出来。我们目前使用的就是这种方式,已经实现了。但是这样很不优雅,而且提交的速度也会比较慢。
> > > > > > 原始邮件 > 发件人: zilong xiao > 收件人: user-zh< >
> > user-zh@flink.apache.org> > 发送时间: 2020年9月25日(周五) 16:32 > 主题: Re: > >
> 怎么样在Flink中使用java代码提交job到yarn > > > > >
> Java程序用process调用脚本提交任务没啥问题吧,获取jobId的问题,我理解可以用yarn rest api & >
> > > flink rest api来完成,希望对你有帮助,祝好~ xiao cai 
> 于2020年9月25日周五 > > > 下午4:23写道: > Hi all: > 大家好,我目前遇到一个flink 任务提交方面的困扰: > > >
> > 想要在自己的项目中(springboot)提交flink > > > >
> job到yarn上。目前采用的方式是process类在本地起进程,调用bin/flink来启动。这样不是很优雅,而且比较难获取到提交的jobId和containerId。看到网上有博客使用ClusterClient的方式来提交,但是遇到了classpath的问题,会缺失一些FLINK_HOME/lib下的jar包
> > > > > 所以我想问,应该如何在自己的应用中提交任务到yarn,既能拿到任务信息,又可以解决classpath为指定带来的困扰。 > 非常感谢
> > > > > > > best, > xiao


Submit New Job -> Show Plan源码

2020-09-25 文章 zilong xiao
[image: image.png]
Hi, 想问下社区有大佬知道Show Plan调用接口的源码在哪块吗?


Re: 怎么样在Flink中使用java代码提交job到yarn

2020-09-25 文章 zilong xiao
Java程序用process调用脚本提交任务没啥问题吧,获取jobId的问题,我理解可以用yarn rest api &
flink rest api来完成,希望对你有帮助,祝好~

xiao cai  于2020年9月25日周五 下午4:23写道:

> Hi all:
> 大家好,我目前遇到一个flink 任务提交方面的困扰:
> 想要在自己的项目中(springboot)提交flink
> job到yarn上。目前采用的方式是process类在本地起进程,调用bin/flink来启动。这样不是很优雅,而且比较难获取到提交的jobId和containerId。看到网上有博客使用ClusterClient的方式来提交,但是遇到了classpath的问题,会缺失一些FLINK_HOME/lib下的jar包
> 所以我想问,应该如何在自己的应用中提交任务到yarn,既能拿到任务信息,又可以解决classpath为指定带来的困扰。
> 非常感谢
>
>
> best,
> xiao


Re: 关于如何贡献社区

2020-09-27 文章 zilong xiao
https://mp.weixin.qq.com/s/EYoiqyRHt3ahiL4K4a3CWA 可以看看这个文章

jinhai wang  于2020年9月27日周日 下午5:45写道:

> @Kyle Zhang
>
> 你需要在jira上说明自己的看法或者修复方案  然后commiter会分配给你
>
>
>
> Best Regards
>
> jinhai...@gmail.com
>
> > 2020年9月27日 下午5:43,Kyle Zhang  写道:
> >
> > Hi,
> >  我在jira上建了一个issue(FLINK-19433
> > 
> > ),后续如何跟进呢,是否要有commiter把任务指配给我?
> >
> > Best regards
>
>


Re: flink 1.11.2 Table sql聚合后设置并行度为2及以上,数据无法输出

2020-09-27 文章 zilong xiao
Hi Asahi Lee
 你需要确认下kafka topic的分区数是多少,如果是1,那就需要设置下rebalance,让每个tm都有数据流入

Asahi Lee <978466...@qq.com> 于2020年9月27日周日 下午6:05写道:

> 你好!
>   我使用flink
> sql,从kafka中读取数据,然后进行sql聚合操作,然后再输出到kafka中;当我设置并行度为1时,程序执行正常;当我设置并行度为2,甚至更大时;程序可以执行,但是我的kafka中没有看到有数据输出?请问是什么原因呢?
>   使用stream api时,我们可以给每个算子设置并行度,那sql api我们是否可以给每条sql设置并行度?


Re: flink基于源码如何编译打包生成flink-table-blink.jar

2020-09-28 文章 zilong xiao
Hi XiaChang
 
你可以对整个Flink工程打包,然后在flink-dist/target/flink-${version}-bin/flink-${version}/lib中找到,希望对你有帮助~

祝好

XiaChang <13628620...@163.com> 于2020年9月29日周二 上午10:46写道:

> 基于flink源码 如何编译打包生成flink-table-blink.jar
>
> 源码中,flink-table是多模块的,正常打包(mvn clean install -DskipTests
> -Dfast)生成的是每个模块单独的jar,而不是flink-table-blink.jar
>
> 请问如何打包才能生成flink-table-blink.jar
>
>
>
>
>
>


Flink 1.11 datastream写hive parquet表异常

2020-09-16 文章 zilong xiao
[image: image.png]


Re: UDF:Type is not supported: ANY

2020-08-03 文章 zilong xiao
不行的,试过了,遍历map把里面的entity全部toString都不行,应该是Java泛型机制的原因,不知道社区大佬怎么看待这个问题

godfrey he  于2020年8月3日周一 下午7:50写道:

> 你把Map换为Map试试
>
> zilong xiao  于2020年8月3日周一 下午4:56写道:
>
> > 目前转List可以用数组代替,Map貌似没法成功运行
> >
> > zilong xiao  于2020年8月3日周一 上午10:43写道:
> >
> > > 最近在写Flink SQL处理数据的时候发现Flink自带的function不太能满足使用,像很常见的Json2Array
> > > Json2Map貌似官方都没有实现,所以需要自定义函数来实现,然后我自己尝试用Jackson来实现时发现在语法检查时总是会报 `Type is
> > not
> > > supported:
> > >
> >
> ANY`,个人猜想这个是不是和Java泛型的特性有关,由于Java是假泛型,最终泛型会被擦除编程Object才会引发这个异常呢?想到Flink本身也有一个字符串转容器的函数
> > > STR_TO_MAP,看了下该函数实现是用Scala编写,不确定该异常是否真是由泛型引起,如果是,如果想要Java写Json2Array
> > Json2Map
> > > udf应该怎么操作呢?求前辈指导
> > >
> > > udfd代码如下:
> > >
> > > public class Json2List extends ScalarFunction {
> > >
> > >private static final Logger LOG =
> > LoggerFactory.getLogger(Json2List.class);
> > >
> > >private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
> > >   .configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true)
> > >   .configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS,
> true) ;
> > >
> > >public Json2List(){}
> > >
> > >public List eval(String param) {
> > >   List result = new ArrayList<>();
> > >   try {
> > >  List> list =
> OBJECT_MAPPER.readValue(param,
> > List.class);
> > >  for(Map map : list){
> > > result.add(OBJECT_MAPPER.writeValueAsString(map));
> > >  }
> > >  return result;
> > >   } catch (JsonProcessingException e){
> > >  LOG.error("failed to convert json to array, param is: {}",
> > param, e);
> > >   }
> > >   return result;
> > >}
> > >
> > >
> > >@Override
> > >public TypeInformation> getResultType(Class[]
> > signature) {
> > >   return Types.LIST(Types.STRING);
> > >}
> > >
> > > }
> > >
> > >
> >
>


Re: Flink sql 转义字符问题

2020-08-01 文章 zilong xiao
非常感谢您的回复,恰好我的SQL中既用到了;
也用到了\n,但是非常奇怪的是,直接使用时前者会报语法异常,而后者不会,在把;使用Unicode转义后,作业能够正常运行,且数据处理也ok,flink
版本是 1.10,初步判断可能是前端SQL传递有问题

Leonard Xu  于2020年7月31日周五 下午9:13写道:

> Hi, zilong
>
> SPLIT_INDEX(${xxx}, ‘;’, 0)
>
>  ‘;’ 分号不是特殊字符,编译时应该不会报错的,我在Flink 1.11.1 用DDL 测试了下, 能够work的,不知道你的环境是怎样的。
>   U&'\003B'  是 ; 的 unicode编码,所以用这个unicode编码是可以的,但一般这种用法是在需要用不可见字符分割时我们这样使用,
>   比如 \n 对应的s是 U&'\\000A’ ,\r 对应的是 U&'\\000D’,
> 对于分号这种可见字符来讲,不需要用unicode编码就可以的。
>
> 祝好
> Leonard
>
> > 在 2020年7月31日,20:46,zilong xiao  写道:
> >
> > U&'\003B'  这么写就可以了 感觉好奇怪啊。。
> >
> > 李奇 <359502...@qq.com> 于2020年7月31日周五 下午8:25写道:
> >
> >> 加反斜杠就可以。\;  只不过分号应该不是特殊字符吧。
> >>
> >>> 在 2020年7月31日,下午8:13,zilong xiao  写道:
> >>>
> >>> SPLIT_INDEX(${xxx}, ';',
> >>>
> >>
> 0),想从字符串中按分号切割,可是分号应该是特殊字符,语法检查总是不能通过,网上查说是可以转义,但是也没太搞懂怎么才能转义,有遇到过类似问题的大佬求指点~~
> >>
>
>


Re: UDF:Type is not supported: ANY

2020-08-05 文章 zilong xiao
我这么写过,貌似不行,下面是我的代码,可否看下是否可行?

public class Json2Map extends ScalarFunction {

   private static final Logger LOG = LoggerFactory.getLogger(Json2Map.class);

   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

   public Json2Map(){}

   public Map eval(String param) {
  Map result = new HashMap<>();
  try {
 if (param == null) {
return result;
 }
 result = OBJECT_MAPPER.readValue(param, Map.class);
 // 遍历toString 貌似也不行
 //for(Object obj : tmp.keySet()){
 // result.put(String.valueOf(obj), String.valueOf(tmp.get(obj)));
 //}
 LOG.info("result is: {}", result);
  } catch (JsonProcessingException e){
 LOG.error("failed to convert json to map, param is: {}", param, e);
  }
  return result;
   }


   @Override
   public TypeInformation>
getResultType(Class[] signature) {
  return Types.MAP(Types.STRING, Types.STRING);
   }

}


Benchao Li  于2020年8月6日周四 上午11:04写道:

> 可以直接返回Map类型呀,比如:
>
> public class String2Map extends ScalarFunction {
>
>public Map eval(String param) throws Exception {
>   Map map = new HashMap<>();
>   // ...
>   return map;
>}
>
>@Override
>public TypeInformation getResultType(Class[] signature) {
>   return Types.MAP(Types.STRING, Types.STRING);
>}
>
> }
>
>
> zilong xiao  于2020年8月6日周四 上午10:24写道:
>
> >
> >
> 感谢benchao大佬回答,对于Java用户而言,Map容器用的也比较多,目前有用户需要从json字符串中取对应key的值,目前的做法是提供了一个getValueFromJson的UDF,每次要取值时都要将json反序列化成Map然后get(key),有什么方法可以使得UDF直接返回Map类型吗?这样就可以避免每次都去反序列化然后取值
> >
> > Benchao Li  于2020年8月5日周三 下午11:49写道:
> >
> > > Hi zilong,
> > >
> > > SQL里面的ARRAY类型,对应的legacy
> > type,应该是Types.PRIMITIVE_ARRAY或者Types.OBJECT_ARRAY,
> > > 其他类型的type information会被当做any类型来处理。
> > > 这里应该跟泛型没有关系,就是在实现的时候并没有考虑将Types.LIST(Types.STRING)当做SQL里面的ARRAY类型。
> > > 支持List作为ARRAY的数据,应该要在1.12才能支持[1]。
> > >
> > > [1] https://issues.apache.org/jira/browse/FLINK-18417
> > >
> > > zilong xiao  于2020年8月3日周一 下午8:23写道:
> > >
> > > > 不行的,试过了,遍历map把里面的entity全部toString都不行,应该是Java泛型机制的原因,不知道社区大佬怎么看待这个问题
> > > >
> > > > godfrey he  于2020年8月3日周一 下午7:50写道:
> > > >
> > > > > 你把Map换为Map试试
> > > > >
> > > > > zilong xiao  于2020年8月3日周一 下午4:56写道:
> > > > >
> > > > > > 目前转List可以用数组代替,Map貌似没法成功运行
> > > > > >
> > > > > > zilong xiao  于2020年8月3日周一 上午10:43写道:
> > > > > >
> > > > > > > 最近在写Flink SQL处理数据的时候发现Flink自带的function不太能满足使用,像很常见的Json2Array
> > > > > > > Json2Map貌似官方都没有实现,所以需要自定义函数来实现,然后我自己尝试用Jackson来实现时发现在语法检查时总是会报
> > > `Type
> > > > is
> > > > > > not
> > > > > > > supported:
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> ANY`,个人猜想这个是不是和Java泛型的特性有关,由于Java是假泛型,最终泛型会被擦除编程Object才会引发这个异常呢?想到Flink本身也有一个字符串转容器的函数
> > > > > > >
> > > STR_TO_MAP,看了下该函数实现是用Scala编写,不确定该异常是否真是由泛型引起,如果是,如果想要Java写Json2Array
> > > > > > Json2Map
> > > > > > > udf应该怎么操作呢?求前辈指导
> > > > > > >
> > > > > > > udfd代码如下:
> > > > > > >
> > > > > > > public class Json2List extends ScalarFunction {
> > > > > > >
> > > > > > >private static final Logger LOG =
> > > > > > LoggerFactory.getLogger(Json2List.class);
> > > > > > >
> > > > > > >private static final ObjectMapper OBJECT_MAPPER = new
> > > > ObjectMapper()
> > > > > > >   .configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true)
> > > > > > >
>  .configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS,
> > > > > true) ;
> > > > > > >
> > > > > > >public Json2List(){}
> > > > > > >
> > > > > > >public List eval(String param) {
> > > > > > >   List result = new ArrayList<>();
> > > > > > >   try {
> > > > > > >  List> list =
> > > > > OBJECT_MAPPER.readValue(param,
> > > > > > List.class);
> > > > > > >  for(Map map : list){
> > > > > > > result.add(OBJECT_MAPPER.writeValueAsString(map));
> > > > > > >  }
> > > > > > >  return result;
> > > > > > >   } catch (JsonProcessingException e){
> > > > > > >  LOG.error("failed to convert json to array, param is:
> > {}",
> > > > > > param, e);
> > > > > > >   }
> > > > > > >   return result;
> > > > > > >}
> > > > > > >
> > > > > > >
> > > > > > >@Override
> > > > > > >public TypeInformation>
> getResultType(Class[]
> > > > > > signature) {
> > > > > > >   return Types.LIST(Types.STRING);
> > > > > > >}
> > > > > > >
> > > > > > > }
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > >
> > > --
> > >
> > > Best,
> > > Benchao Li
> > >
> >
>
>
> --
>
> Best,
> Benchao Li
>


Re: UDF:Type is not supported: ANY

2020-08-05 文章 zilong xiao
感谢benchao大佬回答,对于Java用户而言,Map容器用的也比较多,目前有用户需要从json字符串中取对应key的值,目前的做法是提供了一个getValueFromJson的UDF,每次要取值时都要将json反序列化成Map然后get(key),有什么方法可以使得UDF直接返回Map类型吗?这样就可以避免每次都去反序列化然后取值

Benchao Li  于2020年8月5日周三 下午11:49写道:

> Hi zilong,
>
> SQL里面的ARRAY类型,对应的legacy type,应该是Types.PRIMITIVE_ARRAY或者Types.OBJECT_ARRAY,
> 其他类型的type information会被当做any类型来处理。
> 这里应该跟泛型没有关系,就是在实现的时候并没有考虑将Types.LIST(Types.STRING)当做SQL里面的ARRAY类型。
> 支持List作为ARRAY的数据,应该要在1.12才能支持[1]。
>
> [1] https://issues.apache.org/jira/browse/FLINK-18417
>
> zilong xiao  于2020年8月3日周一 下午8:23写道:
>
> > 不行的,试过了,遍历map把里面的entity全部toString都不行,应该是Java泛型机制的原因,不知道社区大佬怎么看待这个问题
> >
> > godfrey he  于2020年8月3日周一 下午7:50写道:
> >
> > > 你把Map换为Map试试
> > >
> > > zilong xiao  于2020年8月3日周一 下午4:56写道:
> > >
> > > > 目前转List可以用数组代替,Map貌似没法成功运行
> > > >
> > > > zilong xiao  于2020年8月3日周一 上午10:43写道:
> > > >
> > > > > 最近在写Flink SQL处理数据的时候发现Flink自带的function不太能满足使用,像很常见的Json2Array
> > > > > Json2Map貌似官方都没有实现,所以需要自定义函数来实现,然后我自己尝试用Jackson来实现时发现在语法检查时总是会报
> `Type
> > is
> > > > not
> > > > > supported:
> > > > >
> > > >
> > >
> >
> ANY`,个人猜想这个是不是和Java泛型的特性有关,由于Java是假泛型,最终泛型会被擦除编程Object才会引发这个异常呢?想到Flink本身也有一个字符串转容器的函数
> > > > >
> STR_TO_MAP,看了下该函数实现是用Scala编写,不确定该异常是否真是由泛型引起,如果是,如果想要Java写Json2Array
> > > > Json2Map
> > > > > udf应该怎么操作呢?求前辈指导
> > > > >
> > > > > udfd代码如下:
> > > > >
> > > > > public class Json2List extends ScalarFunction {
> > > > >
> > > > >private static final Logger LOG =
> > > > LoggerFactory.getLogger(Json2List.class);
> > > > >
> > > > >private static final ObjectMapper OBJECT_MAPPER = new
> > ObjectMapper()
> > > > >   .configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true)
> > > > >   .configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS,
> > > true) ;
> > > > >
> > > > >public Json2List(){}
> > > > >
> > > > >public List eval(String param) {
> > > > >   List result = new ArrayList<>();
> > > > >   try {
> > > > >  List> list =
> > > OBJECT_MAPPER.readValue(param,
> > > > List.class);
> > > > >  for(Map map : list){
> > > > > result.add(OBJECT_MAPPER.writeValueAsString(map));
> > > > >  }
> > > > >  return result;
> > > > >   } catch (JsonProcessingException e){
> > > > >  LOG.error("failed to convert json to array, param is: {}",
> > > > param, e);
> > > > >   }
> > > > >   return result;
> > > > >}
> > > > >
> > > > >
> > > > >@Override
> > > > >public TypeInformation> getResultType(Class[]
> > > > signature) {
> > > > >   return Types.LIST(Types.STRING);
> > > > >}
> > > > >
> > > > > }
> > > > >
> > > > >
> > > >
> > >
> >
>
>
> --
>
> Best,
> Benchao Li
>


Flink sql 转义字符问题

2020-07-31 文章 zilong xiao
SPLIT_INDEX(${xxx}, ';',
0),想从字符串中按分号切割,可是分号应该是特殊字符,语法检查总是不能通过,网上查说是可以转义,但是也没太搞懂怎么才能转义,有遇到过类似问题的大佬求指点~~


Re: Flink sql 转义字符问题

2020-07-31 文章 zilong xiao
U&'\003B'  这么写就可以了 感觉好奇怪啊。。

李奇 <359502...@qq.com> 于2020年7月31日周五 下午8:25写道:

> 加反斜杠就可以。\;  只不过分号应该不是特殊字符吧。
>
> > 在 2020年7月31日,下午8:13,zilong xiao  写道:
> >
> > SPLIT_INDEX(${xxx}, ';',
> >
> 0),想从字符串中按分号切割,可是分号应该是特殊字符,语法检查总是不能通过,网上查说是可以转义,但是也没太搞懂怎么才能转义,有遇到过类似问题的大佬求指点~~
>


Re: Flink sql 转义字符问题

2020-07-31 文章 zilong xiao
实测反斜杠好像也不行

李奇 <359502...@qq.com> 于2020年7月31日周五 下午8:25写道:

> 加反斜杠就可以。\;  只不过分号应该不是特殊字符吧。
>
> > 在 2020年7月31日,下午8:13,zilong xiao  写道:
> >
> > SPLIT_INDEX(${xxx}, ';',
> >
> 0),想从字符串中按分号切割,可是分号应该是特殊字符,语法检查总是不能通过,网上查说是可以转义,但是也没太搞懂怎么才能转义,有遇到过类似问题的大佬求指点~~
>


UDF:Type is not supported: ANY

2020-08-02 文章 zilong xiao
最近在写Flink SQL处理数据的时候发现Flink自带的function不太能满足使用,像很常见的Json2Array
Json2Map貌似官方都没有实现,所以需要自定义函数来实现,然后我自己尝试用Jackson来实现时发现在语法检查时总是会报 `Type is not
supported:
ANY`,个人猜想这个是不是和Java泛型的特性有关,由于Java是假泛型,最终泛型会被擦除编程Object才会引发这个异常呢?想到Flink本身也有一个字符串转容器的函数
STR_TO_MAP,看了下该函数实现是用Scala编写,不确定该异常是否真是由泛型引起,如果是,如果想要Java写Json2Array Json2Map
udf应该怎么操作呢?求前辈指导

udfd代码如下:

public class Json2List extends ScalarFunction {

   private static final Logger LOG = LoggerFactory.getLogger(Json2List.class);

   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
  .configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true)
  .configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS, true) ;

   public Json2List(){}

   public List eval(String param) {
  List result = new ArrayList<>();
  try {
 List> list =
OBJECT_MAPPER.readValue(param, List.class);
 for(Map map : list){
result.add(OBJECT_MAPPER.writeValueAsString(map));
 }
 return result;
  } catch (JsonProcessingException e){
 LOG.error("failed to convert json to array, param is: {}", param, e);
  }
  return result;
   }


   @Override
   public TypeInformation> getResultType(Class[] signature) {
  return Types.LIST(Types.STRING);
   }

}


Re: flink 1.11 日志不能正常打印问题

2020-08-13 文章 zilong xiao
想问下你是用的什么类型的配置 xml、yaml还是properties呢?

caozhen  于2020年8月14日周五 上午9:58写道:

> 我最后用的是log4j2。
>
>
> 之前mainjar中有很多log4j的依赖(slf4j-log4j12),而flink客户端lib下是log4j2的依赖(log4j-slf4j-impl),导致了冲突,不能打印日志。
>
> 改动:把mainjar中的log4j的依赖改成provided,使用了客户端提供的log4j2依赖
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink 1.11 日志不能正常打印问题

2020-08-13 文章 zilong xiao
我也是用的properties配置文件,可是日志貌似没收集到,有什么方法可以判断配置文件是否生效吗 ?

caozhen  于2020年8月14日周五 上午10:23写道:

> log4j2的配置:我是直接用的flink1.11.1客户端提供的log4j-console.properties。
>
> 如果你是用的xml、yaml文件,在客户端提交作业时可能要指定一下日志文件,也可以改下flink启动脚本的日志设置
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: UDF:Type is not supported: ANY

2020-08-03 文章 zilong xiao
目前转List可以用数组代替,Map貌似没法成功运行

zilong xiao  于2020年8月3日周一 上午10:43写道:

> 最近在写Flink SQL处理数据的时候发现Flink自带的function不太能满足使用,像很常见的Json2Array
> Json2Map貌似官方都没有实现,所以需要自定义函数来实现,然后我自己尝试用Jackson来实现时发现在语法检查时总是会报 `Type is not
> supported:
> ANY`,个人猜想这个是不是和Java泛型的特性有关,由于Java是假泛型,最终泛型会被擦除编程Object才会引发这个异常呢?想到Flink本身也有一个字符串转容器的函数
> STR_TO_MAP,看了下该函数实现是用Scala编写,不确定该异常是否真是由泛型引起,如果是,如果想要Java写Json2Array Json2Map
> udf应该怎么操作呢?求前辈指导
>
> udfd代码如下:
>
> public class Json2List extends ScalarFunction {
>
>private static final Logger LOG = LoggerFactory.getLogger(Json2List.class);
>
>private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
>   .configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true)
>   .configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS, true) ;
>
>public Json2List(){}
>
>public List eval(String param) {
>   List result = new ArrayList<>();
>   try {
>  List> list = OBJECT_MAPPER.readValue(param, 
> List.class);
>  for(Map map : list){
> result.add(OBJECT_MAPPER.writeValueAsString(map));
>  }
>  return result;
>   } catch (JsonProcessingException e){
>  LOG.error("failed to convert json to array, param is: {}", param, e);
>   }
>   return result;
>}
>
>
>@Override
>public TypeInformation> getResultType(Class[] signature) {
>   return Types.LIST(Types.STRING);
>}
>
> }
>
>


flink SQL如何将秒转换为timestamp

2020-06-30 文章 zilong xiao
有一个字段itime,类型为int,意为当前时间的秒值,如何将该字段转换成timestamp?以下是我的想法,不知是否正确,求遇到过类似问题的大佬指导

TO_TIMESTAMP(DATE_FORMAT(CAST(itime * 1000 as TIMESTAMP(3)), '-MM-dd
hh:mm:ss'))


Re: flink SQL如何将秒转换为timestamp

2020-06-30 文章 zilong xiao
好的,我试试~

王松  于2020年6月30日周二 下午5:35写道:

> 可以试试这样写:
> TO_TIMESTAMP(FROM_UNIXTIME(itime, '-MM-dd HH:mm:ss'))
>
> zilong xiao  于2020年6月30日周二 下午4:30写道:
>
> >
> 有一个字段itime,类型为int,意为当前时间的秒值,如何将该字段转换成timestamp?以下是我的想法,不知是否正确,求遇到过类似问题的大佬指导
> >
> > TO_TIMESTAMP(DATE_FORMAT(CAST(itime * 1000 as TIMESTAMP(3)), '-MM-dd
> > hh:mm:ss'))
> >
>


Flink SQL如何将多个表的查询结果(列不同)聚合成一张表

2020-07-08 文章 zilong xiao
列如下面这样,需要查询table1 & table2,分别查询不同的字段
在最外层做比值,flink貌似语法检查不通过,应该怎么写这样的SQL呢,有前辈可以指导下不~
select a.table_tmp1.r1 / a.table_tmp2.r2 as value0 from
(
(SELECT r1 FROM table1) AS table_tmp1, (SELECT r2 FROM table2) AS
table_tmp2,
)as a


flink-benchmarks使用求助

2020-07-10 文章 zilong xiao
如题,最近在新机器上跑flink-benchmarks验证下机器性能,但是不太会对跑出的结果进行分析,不知是否有大神也用过这个,可否指点一二


Re: flink-benchmarks使用求助

2020-07-13 文章 zilong xiao
是的,用的 flink-benchmarks 代码,在跑的时候,指定参数-t max(最大工程线程),在运行中会出现异常: `shutdown
timeout of 30 seconds expired, forcing forked VM to exit`,前辈有遇到过这种情况吗?

Congxian Qiu  于2020年7月10日周五 下午7:18写道:

> Hi
> 你说的 flink-benchmarks 是指 这个仓库[1]的代码吗? 是这个仓库的代码的话,你按照 readme 能跑出一个结果(csv
> 文件,或者终端能看到最终的结果),这个结果就是 JMH 的的结果,具体的可以阅读 JMH 的相关文档[2]
>
> [1] https://github.com/dataArtisans/flink-benchmarks
> [2] http://openjdk.java.net/projects/code-tools/jmh/
>
> Best,
> Congxian
>
>
> zilong xiao  于2020年7月10日周五 下午3:54写道:
>
> > 如题,最近在新机器上跑flink-benchmarks验证下机器性能,但是不太会对跑出的结果进行分析,不知是否有大神也用过这个,可否指点一二
> >
>


Re: flink-benchmarks使用求助

2020-07-13 文章 zilong xiao
`-t max`之后出现的~ 改小并发后貌似没问题

Congxian Qiu  于2020年7月13日周一 下午8:14写道:

> Hi
>
> 没有遇到过这个错误,这个错误是在指定 `-t max` 之后出现的,还是说其他情况下也会遇到呢?
>
> Best,
> Congxian
>
>
> zilong xiao  于2020年7月13日周一 下午2:32写道:
>
> > 是的,用的 flink-benchmarks 代码,在跑的时候,指定参数-t max(最大工程线程),在运行中会出现异常: `shutdown
> > timeout of 30 seconds expired, forcing forked VM to exit`,前辈有遇到过这种情况吗?
> >
> > Congxian Qiu  于2020年7月10日周五 下午7:18写道:
> >
> > > Hi
> > > 你说的 flink-benchmarks 是指 这个仓库[1]的代码吗? 是这个仓库的代码的话,你按照 readme 能跑出一个结果(csv
> > > 文件,或者终端能看到最终的结果),这个结果就是 JMH 的的结果,具体的可以阅读 JMH 的相关文档[2]
> > >
> > > [1] https://github.com/dataArtisans/flink-benchmarks
> > > [2] http://openjdk.java.net/projects/code-tools/jmh/
> > >
> > > Best,
> > > Congxian
> > >
> > >
> > > zilong xiao  于2020年7月10日周五 下午3:54写道:
> > >
> > > >
> 如题,最近在新机器上跑flink-benchmarks验证下机器性能,但是不太会对跑出的结果进行分析,不知是否有大神也用过这个,可否指点一二
> > > >
> > >
> >
>


Re: flink1.10.1 flink sql消费kafka当parallelism大于1时不产生watermark

2020-07-12 文章 zilong xiao
topic是几个分区呢?如果是一个分区,要加一个rebalance参数吧?

wind.fly@outlook.com  于2020年7月13日周一 上午11:46写道:

> Hi, all:
> 本人使用的flink版本为flink 1.10.1, flink sql消费kafka,
> 当parallelism为1时正常运行,但讲parallelism修改为2时,在yarn-session
> web页面看不到watermark的指标信息了,也没有计算结果输出,sql如下:
> insert into
>   x.report.bi_report_fence_common_indicators
> select
>   fence_id,
>   'finishedOrderCnt' as indicator_name,
>   TUMBLE_END(dt, INTERVAL '5' MINUTE) as ts,
>   count(1) as indicator_val
> from
>   (
> select
>   dt,
>   fence_id,
>   fence_coordinates_array,
>   c.driver_location
> from
>   (
> select
>   *
> from
>   (
> select
>   dt,
>   driver_location,
>   r1.f1.fence_info as fence_info
> from
>   (
> select
>   o.dt,
>   o.driver_location,
>   MD5(r.city_code) as k,
>   PROCTIME() as proctime
> from
>   (
> select
>   order_no,
>   dt,
>   driver_location,
>   PROCTIME() as proctime
> from
>   x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner
> where
>   _type = 'insert'
>   and event_code = 'arriveAndSettlement'
>   ) o
>   LEFT JOIN x.dim.saic_trip_create_t_order FOR SYSTEM_TIME
> AS OF o.proctime AS r ON r.order_no = o.order_no
>   ) o1
>   LEFT JOIN x.dim.fence_info FOR SYSTEM_TIME AS OF o1.proctime
> AS r1 ON r1.k = o1.k
>   ) a
> where
>   fence_info is not null
>   ) c
>   LEFT JOIN LATERAL TABLE(fence_split(c.fence_info)) as T(fence_id,
> fence_coordinates_array) ON TRUE
>   ) as b
> where
>   in_fence(fence_coordinates_array, driver_location)
> group by
>   TUMBLE(dt, INTERVAL '5' MINUTE),
>   fence_id;
>其中
> x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner表中dt为watermark字段,建表语句如下:
>CREATE TABLE
> x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner(
>   _type STRING,
>   _old_id BIGINT,
>   id BIGINT,
>   _old_order_no STRING,
>   order_no STRING,
>   _old_event_code STRING,
>   event_code STRING,
>   _old_from_state TINYINT,
>   from_state TINYINT,
>   _old_to_state TINYINT,
>   to_state TINYINT,
>   _old_operator_type TINYINT,
>   operator_type TINYINT,
>   _old_passenger_location STRING,
>   passenger_location STRING,
>   _old_driver_location STRING,
>   driver_location STRING,
>   _old_trans_time STRING,
>   trans_time STRING,
>   _old_create_time STRING,
>   create_time STRING,
>   _old_update_time STRING,
>   update_time STRING,
>   _old_passenger_poi_address STRING,
>   passenger_poi_address STRING,
>   _old_passenger_detail_address STRING,
>   passenger_detail_address STRING,
>   _old_driver_poi_address STRING,
>   driver_poi_address STRING,
>   _old_driver_detail_address STRING,
>   driver_detail_address STRING,
>   _old_operator STRING,
>   operator STRING,
>   _old_partition_index TINYINT,
>   partition_index TINYINT,
>   dt as TO_TIMESTAMP(trans_time),
>   WATERMARK FOR dt AS dt - INTERVAL '5' SECOND
> ) WITH (
>   'connector.type' = 'kafka',
>   'connector.properties.bootstrap.servers' = '*',
>   'connector.properties.zookeeper.connect' = '*',
>   'connector.version' = 'universal',
>   'format.type' = 'json',
>   'connector.properties.group.id' = 'testGroup',
>   'connector.startup-mode' = 'group-offsets',
>   'connector.topic' = 'x'
> )
>


Re: Flink SQL ddl 中含有关键字 且在ddl中使用TO_TIMESTAMP、TO_DATE函数语法检查异常问题

2020-06-16 文章 zilong xiao
是用的1.10.0版本,我尝试切到1.10.1试试看,请问这个有对应的issue吗?想深入了解下这个问题

Benchao Li  于2020年6月16日周二 下午5:00写道:

> 你用的是1.10.0版本么?这个版本的计算列处理有点问题,建议用1.10.1尝试一下。
>
> zilong xiao  于2020年6月16日周二 下午4:56写道:
>
>> 如题,在SQL
>> ddl中如含有关键字字段(已用反引号括上),并且在ddl中使用了TO_TIMESTAMP、TO_DATE函数,在进行语法检查的时候会报关键字冲突异常,如果将图中的```itime
>> as TO_TIMESTAMP(time2)```去掉,语法检查正常通过,还麻烦各位大佬帮忙看看~
>> 代码如下图:
>> [image: image.png]
>> 异常堆栈:
>>
>


Flink SQL ddl 中含有关键字 且在ddl中使用TO_TIMESTAMP、TO_DATE函数语法检查异常问题

2020-06-16 文章 zilong xiao
如题,在SQL
ddl中如含有关键字字段(已用反引号括上),并且在ddl中使用了TO_TIMESTAMP、TO_DATE函数,在进行语法检查的时候会报关键字冲突异常,如果将图中的```itime
as TO_TIMESTAMP(time2)```去掉,语法检查正常通过,还麻烦各位大佬帮忙看看~
代码如下图:
[image: image.png]
异常堆栈:


Re: Flink SQL ddl 中含有关键字 且在ddl中使用TO_TIMESTAMP、TO_DATE函数语法检查异常问题

2020-06-16 文章 zilong xiao
看了下issue,跟我描述的问题很相似,我尝试切到1.10.1试试看,谢谢您的解惑

Kurt Young  于2020年6月16日周二 下午5:15写道:

> 应该是这个: https://issues.apache.org/jira/browse/FLINK-16068
>
> Best,
> Kurt
>
>
> On Tue, Jun 16, 2020 at 5:09 PM zilong xiao  wrote:
>
> > 我看了下1.10.1的release note,您说的应该就是这个issue:
> > https://issues.apache.org/jira/browse/FLINK-16345
> > ,但是这个issue所描述的问题貌似和我的不太一样,我的这个问题是在使用TO_TIMESTAMP、TO_
> > DATE函数且,ddl中含有关键字字段时,语法检测会报错,不知道这个问题是否跟这个issue有关呢?
> >
> > Benchao Li  于2020年6月16日周二 下午5:00写道:
> >
> > > 你用的是1.10.0版本么?这个版本的计算列处理有点问题,建议用1.10.1尝试一下。
> > >
> > > zilong xiao  于2020年6月16日周二 下午4:56写道:
> > >
> > >> 如题,在SQL
> > >>
> >
> ddl中如含有关键字字段(已用反引号括上),并且在ddl中使用了TO_TIMESTAMP、TO_DATE函数,在进行语法检查的时候会报关键字冲突异常,如果将图中的```itime
> > >> as TO_TIMESTAMP(time2)```去掉,语法检查正常通过,还麻烦各位大佬帮忙看看~
> > >> 代码如下图:
> > >> [image: image.png]
> > >> 异常堆栈:
> > >>
> > >
> >
>


Re: Flink SQL ddl 中含有关键字 且在ddl中使用TO_TIMESTAMP、TO_DATE函数语法检查异常问题

2020-06-16 文章 zilong xiao
我看了下1.10.1的release note,您说的应该就是这个issue:
https://issues.apache.org/jira/browse/FLINK-16345
,但是这个issue所描述的问题貌似和我的不太一样,我的这个问题是在使用TO_TIMESTAMP、TO_
DATE函数且,ddl中含有关键字字段时,语法检测会报错,不知道这个问题是否跟这个issue有关呢?

Benchao Li  于2020年6月16日周二 下午5:00写道:

> 你用的是1.10.0版本么?这个版本的计算列处理有点问题,建议用1.10.1尝试一下。
>
> zilong xiao  于2020年6月16日周二 下午4:56写道:
>
>> 如题,在SQL
>> ddl中如含有关键字字段(已用反引号括上),并且在ddl中使用了TO_TIMESTAMP、TO_DATE函数,在进行语法检查的时候会报关键字冲突异常,如果将图中的```itime
>> as TO_TIMESTAMP(time2)```去掉,语法检查正常通过,还麻烦各位大佬帮忙看看~
>> 代码如下图:
>> [image: image.png]
>> 异常堆栈:
>>
>


Re: flink1.11日志上报

2020-07-23 文章 zilong xiao
这个可以用配置文件实现,利用kafka
appender将日志打到kafka中,然后自己去消费kafka处理即可,1.11中支持log4j2了,建议使用log4j2

Dream-底限  于2020年7月24日周五 上午10:50写道:

> hi、
>
> 我这面想实现一个日志上报的功能,就是flink任务启动后,让flink主动将当前任务日志打到外部存储系统,想问一下flink有对应的接口吗,具体要实现哪一个类哪
>


Re: FlinkSQL 1.10 事件时间声明不能包含系统保留字

2020-12-29 文章 zilong xiao
没记错这是一个bug,计算列中含有关键字会异常,可以看下这个issue:
https://issues.apache.org/jira/browse/FLINK-16068

Robin Zhang  于2020年12月29日周二 下午6:56写道:

> -- 定义时间非系统保留字为事件时间字段,能正常运行
> create table events (
> process_time  bigint  comment '事件时间',
> event   string  comment '事件类型',
> ts AS TO_TIMESTAMP(FROM_UNIXTIME(process_time/1000, '-MM-dd
> HH:mm:ss')),
> WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
> ) with (
>   ... ...
> );
>
> 但是,定义的字段是系统保留字时,就会报错:
> create table events (
> `time`  bigint  comment '事件时间',
>  eventstring  comment '事件类型',
> ts AS TO_TIMESTAMP(FROM_UNIXTIME(`time`/1000, '-MM-dd HH:mm:ss')),
> WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
> ) with (
>   ... ...
> );
>
> 但现在问题是:神策埋点的事件时间字段是time,如果单独写一个程序转换字段的话,显得有些鸡肋。
> 不知道是不是bug,目前还没想到较好的解决方案。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Flink CVE补丁会打在1.10版本吗

2021-01-06 文章 zilong xiao
Hi Luna,
可以找到对应的commit,将其修改cherry pick到自己的工程重新打包

祝好~

Luna Wong  于2021年1月7日周四 上午10:44写道:

> Flink 那个两个CVE的Patch 会打在1.10.4这个版本吗。目前修复版本是 1.11.3 1.12.0 ,很多生成还都在1.10
> 不好直接升级的。
>


Re: 请教一下目前flink submit能不能指定额外的依赖jar

2020-11-30 文章 zilong xiao
原来如此,我觉得是一个不错的想法,但是其实对用户来说,最好除了写SQL之外,其他事情都不要做是最好


Re: 请教一下目前flink submit能不能指定额外的依赖jar

2020-11-30 文章 zilong xiao
Hi silence,

想问下为什么一定要submit参数呢?我理解如果是做平台的话,用户如果有多个jar依赖,为什么不把这些jar统一打包到任务主jar里呢?,平台可以提供一些公共依赖,比如flink,hadoop等

silence  于2020年11月30日周一 下午5:20写道:

> 看了很多同学回复yarn的解决方案
>
> 我这再补充一下:
> 还是希望可以提供更通用的submit参数来解决此问题,
> 包括提交到standalone集群时可以额外指定本地依赖jar
>
> 有没有cli相关的同学可以跟进下建议
> 谢谢
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink SQL Row里嵌套Array该如何用DDL定义?

2020-11-24 文章 zilong xiao
好的,感谢Benchao的解答~

Benchao Li  于2020年11月24日周二 下午7:49写道:

> 从这一行代码看出来的:
>
> https://github.com/yangyichao-mango/flink-protobuf/blob/616051d74d0973136f931189fd29bd78c0e5/src/main/java/flink/formats/protobuf/ProtobufRowDeserializationSchema.java#L107
>
> 现在社区还没有正式支持ProtoBuf Format,不过已经有相关issue和讨论了[1]
>
> [1] https://issues.apache.org/jira/browse/FLINK-18202
>
> zilong xiao  于2020年11月24日周二 下午4:46写道:
>
> > 这是从哪看出来的呢 求指点,另外如果想用DDL写的schema 应该怎么做呢?
> >
> > Benchao Li  于2020年11月24日周二 下午4:33写道:
> >
> > > 看起来这个format是用的自动推导schema,而不是用的DDL写的schema。
> > >
> > > zilong xiao  于2020年11月24日周二 下午4:13写道:
> > >
> > > > 用的Flink1.11 不过是用的别人写的format,估计是这里面有bug吧,
> > > > https://github.com/yangyichao-mango/flink-protobuf
> > > >
> > > > Benchao Li  于2020年11月24日周二 下午3:43写道:
> > > >
> > > > > 看起来你的DDL写的没有什么问题。
> > > > >
> > > > > 你用的是哪个Flink版本呢?
> > > > > 此外就是可以发下更完整的异常栈么?
> > > > >
> > > > > zilong xiao  于2020年11月24日周二 下午2:54写道:
> > > > >
> > > > > > Hi Benchao,图片可以看https://imgchr.com/i/DtoGge,期待您的解答~
> > > > > >
> > > > > > Benchao Li  于2020年11月24日周二 下午2:49写道:
> > > > > >
> > > > > > > 你的图片挂了,可以将图片上传到第三方的图床再发出来;或者直接发送文本。
> > > > > > >
> > > > > > > zilong xiao  于2020年11月24日周二 上午10:49写道:
> > > > > > >
> > > > > > > > [image: image.png]
> > > > > > > > 如题,尝试用以下方式定义时会遇到异常,求社区大佬指点正确的打开姿势。
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > >
> > > > > > > Best,
> > > > > > > Benchao Li
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > >
> > > > > Best,
> > > > > Benchao Li
> > > > >
> > > >
> > >
> > >
> > > --
> > >
> > > Best,
> > > Benchao Li
> > >
> >
>
>
> --
>
> Best,
> Benchao Li
>


Re: Flink SQL Row里嵌套Array该如何用DDL定义?

2020-11-24 文章 zilong xiao
这是从哪看出来的呢 求指点,另外如果想用DDL写的schema 应该怎么做呢?

Benchao Li  于2020年11月24日周二 下午4:33写道:

> 看起来这个format是用的自动推导schema,而不是用的DDL写的schema。
>
> zilong xiao  于2020年11月24日周二 下午4:13写道:
>
> > 用的Flink1.11 不过是用的别人写的format,估计是这里面有bug吧,
> > https://github.com/yangyichao-mango/flink-protobuf
> >
> > Benchao Li  于2020年11月24日周二 下午3:43写道:
> >
> > > 看起来你的DDL写的没有什么问题。
> > >
> > > 你用的是哪个Flink版本呢?
> > > 此外就是可以发下更完整的异常栈么?
> > >
> > > zilong xiao  于2020年11月24日周二 下午2:54写道:
> > >
> > > > Hi Benchao,图片可以看https://imgchr.com/i/DtoGge,期待您的解答~
> > > >
> > > > Benchao Li  于2020年11月24日周二 下午2:49写道:
> > > >
> > > > > 你的图片挂了,可以将图片上传到第三方的图床再发出来;或者直接发送文本。
> > > > >
> > > > > zilong xiao  于2020年11月24日周二 上午10:49写道:
> > > > >
> > > > > > [image: image.png]
> > > > > > 如题,尝试用以下方式定义时会遇到异常,求社区大佬指点正确的打开姿势。
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > >
> > > > > Best,
> > > > > Benchao Li
> > > > >
> > > >
> > >
> > >
> > > --
> > >
> > > Best,
> > > Benchao Li
> > >
> >
>
>
> --
>
> Best,
> Benchao Li
>


Re: Flink SQL Row里嵌套Array该如何用DDL定义?

2020-11-23 文章 zilong xiao
Hi Benchao,图片可以看https://imgchr.com/i/DtoGge,期待您的解答~

Benchao Li  于2020年11月24日周二 下午2:49写道:

> 你的图片挂了,可以将图片上传到第三方的图床再发出来;或者直接发送文本。
>
> zilong xiao  于2020年11月24日周二 上午10:49写道:
>
> > [image: image.png]
> > 如题,尝试用以下方式定义时会遇到异常,求社区大佬指点正确的打开姿势。
> >
>
>
> --
>
> Best,
> Benchao Li
>


Re: Flink SQL Row里嵌套Array该如何用DDL定义?

2020-11-24 文章 zilong xiao
用的Flink1.11 不过是用的别人写的format,估计是这里面有bug吧,
https://github.com/yangyichao-mango/flink-protobuf

Benchao Li  于2020年11月24日周二 下午3:43写道:

> 看起来你的DDL写的没有什么问题。
>
> 你用的是哪个Flink版本呢?
> 此外就是可以发下更完整的异常栈么?
>
> zilong xiao  于2020年11月24日周二 下午2:54写道:
>
> > Hi Benchao,图片可以看https://imgchr.com/i/DtoGge,期待您的解答~
> >
> > Benchao Li  于2020年11月24日周二 下午2:49写道:
> >
> > > 你的图片挂了,可以将图片上传到第三方的图床再发出来;或者直接发送文本。
> > >
> > > zilong xiao  于2020年11月24日周二 上午10:49写道:
> > >
> > > > [image: image.png]
> > > > 如题,尝试用以下方式定义时会遇到异常,求社区大佬指点正确的打开姿势。
> > > >
> > >
> > >
> > > --
> > >
> > > Best,
> > > Benchao Li
> > >
> >
>
>
> --
>
> Best,
> Benchao Li
>


taskmanager.out配置滚动

2020-12-03 文章 zilong xiao
想问下社区的大佬,标准输出文件taskmanager.out可以配置成滚动的吗?


Flink 1.11版本LeaseRenewer线程不释放

2020-12-07 文章 zilong xiao
在生产中发现有个别Flink SQL 1.11作业的container线程数很高,查看Thread Dump发现有很多名为LeaseRenewer
的线程处于TIMED_WAITING状态,目前只能复现其现象,但是无法定位原因,不知道社区是否有类似经历的小伙伴呢?

Flink version: 1.11
State backend:filesystem
checkpoint interval: 60s


Re: taskmanager.out配置滚动

2020-12-03 文章 zilong xiao
好的,了解了,感谢您的解答

Yang Wang  于2020年12月4日周五 上午10:33写道:

> 目前是支持不了的,因为STDOUT/STDERR本身并不是通过slf4j来写的
> 如果要支持是需要在Flink代码里面将Stdout重定向之后,再配置log4j才能解决
>
> Best,
> Yang
>
> zilong xiao  于2020年12月3日周四 下午7:50写道:
>
> > 想问下社区的大佬,标准输出文件taskmanager.out可以配置成滚动的吗?
> >
>


Re: Flink 1.11版本LeaseRenewer线程不释放

2020-12-07 文章 zilong xiao
Hi Paul,
线程名称是一模一样的,都是user1@cluserA,HDFS client版本对于用户来说是透明的,作业使用的是Flink
1.11版本,该Flink版本使用HDFS版本好像是2.8.1,在Flink中和集群有持续交互的就只能想到checkpoint,开了DEBUG日志也没能找到root
cause。。

另外 您说的“线程个数应该和用到的 HDFS 集群数目相同”不是很理解,作业只能提交到一个具体的集群吧?

Paul Lam  于2020年12月8日周二 上午10:45写道:

> 我记得 LeaseRenewer 是 JVM 级别的,线程个数应该和用到的 HDFS 集群数目相同。
>
> 你看看它们具体的线程名是不是完全相同(比如都是 user1@cluserA)?还有 HDFS client 的版本是什么?
>
> Best,
> Paul Lam
>
> > 2020年12月7日 18:11,zilong xiao  写道:
> >
> > 在生产中发现有个别Flink SQL 1.11作业的container线程数很高,查看Thread Dump发现有很多名为LeaseRenewer
> > 的线程处于TIMED_WAITING状态,目前只能复现其现象,但是无法定位原因,不知道社区是否有类似经历的小伙伴呢?
> >
> > Flink version: 1.11
> > State backend:filesystem
> > checkpoint interval: 60s
>
>


Re: Flink 1.11版本LeaseRenewer线程不释放

2020-12-07 文章 zilong xiao
附一张有问题container的线程监控图
[image: image.png]

zilong xiao  于2020年12月8日周二 上午11:03写道:

> Hi Paul,
> 线程名称是一模一样的,都是user1@cluserA,HDFS client版本对于用户来说是透明的,作业使用的是Flink
> 1.11版本,该Flink版本使用HDFS版本好像是2.8.1,在Flink中和集群有持续交互的就只能想到checkpoint,开了DEBUG日志也没能找到root
> cause。。
>
> 另外 您说的“线程个数应该和用到的 HDFS 集群数目相同”不是很理解,作业只能提交到一个具体的集群吧?
>
> Paul Lam  于2020年12月8日周二 上午10:45写道:
>
>> 我记得 LeaseRenewer 是 JVM 级别的,线程个数应该和用到的 HDFS 集群数目相同。
>>
>> 你看看它们具体的线程名是不是完全相同(比如都是 user1@cluserA)?还有 HDFS client 的版本是什么?
>>
>> Best,
>> Paul Lam
>>
>> > 2020年12月7日 18:11,zilong xiao  写道:
>> >
>> > 在生产中发现有个别Flink SQL 1.11作业的container线程数很高,查看Thread
>> Dump发现有很多名为LeaseRenewer
>> > 的线程处于TIMED_WAITING状态,目前只能复现其现象,但是无法定位原因,不知道社区是否有类似经历的小伙伴呢?
>> >
>> > Flink version: 1.11
>> > State backend:filesystem
>> > checkpoint interval: 60s
>>
>>


Re: Flink 1.11版本LeaseRenewer线程不释放

2020-12-08 文章 zilong xiao
作业数据流是 kafka -> flink ->
http/prometheus,目前这类型的作业很多,但是就只有那几个有问题,而且是必现,每次都只能重启,然后看着线程数上涨。。 我再debug看看~

Paul Lam  于2020年12月8日周二 下午6:00写道:

> Hi,
>
> 我之前说的多个集群的情况主要指写入数据到 HDFS。如果只有 checkpoint 依赖 HDFS 而出现这种情况的话,的确是非常奇怪。
>
> Best,
> Paul Lam
>
> > 2020年12月8日 11:03,zilong xiao  写道:
> >
> > Hi Paul,
> >线程名称是一模一样的,都是user1@cluserA,HDFS client版本对于用户来说是透明的,作业使用的是Flink
> >
> 1.11版本,该Flink版本使用HDFS版本好像是2.8.1,在Flink中和集群有持续交互的就只能想到checkpoint,开了DEBUG日志也没能找到root
> > cause。。
> >
> >另外 您说的“线程个数应该和用到的 HDFS 集群数目相同”不是很理解,作业只能提交到一个具体的集群吧?
> >
> > Paul Lam  于2020年12月8日周二 上午10:45写道:
> >
> >> 我记得 LeaseRenewer 是 JVM 级别的,线程个数应该和用到的 HDFS 集群数目相同。
> >>
> >> 你看看它们具体的线程名是不是完全相同(比如都是 user1@cluserA)?还有 HDFS client 的版本是什么?
> >>
> >> Best,
> >> Paul Lam
> >>
> >>> 2020年12月7日 18:11,zilong xiao  写道:
> >>>
> >>> 在生产中发现有个别Flink SQL 1.11作业的container线程数很高,查看Thread
> Dump发现有很多名为LeaseRenewer
> >>> 的线程处于TIMED_WAITING状态,目前只能复现其现象,但是无法定位原因,不知道社区是否有类似经历的小伙伴呢?
> >>>
> >>> Flink version: 1.11
> >>> State backend:filesystem
> >>> checkpoint interval: 60s
> >>
> >>
>
>


测试用例调试问题

2020-11-24 文章 zilong xiao
本地运行测试用例有时会有一堆Scala文件报错,但是整体工程编译又没问题,求大佬解答这种情况该怎么办呢?能忽略Scala文件吗?


Flink SQL Row里嵌套Array该如何用DDL定义?

2020-11-23 文章 zilong xiao
[image: image.png]
如题,尝试用以下方式定义时会遇到异常,求社区大佬指点正确的打开姿势。


Re: Flink 新版本的WEB UI taskmanager没有logs

2020-12-16 文章 zilong xiao
Hi Jacob
1.可否发现使用的配置?
2.检查下jobmanager.err日志,看下日志的绑定是否正确

Jacob <17691150...@163.com> 于2020年12月16日周三 下午4:01写道:

> 
>
> Hello everyone!
>
>
> 如上图所示,升级后的flink,为什么看不到taskmanager的日志了。在Stdout中能看自己代码中打的log,但flink自身的log以及springboot相关的log等,都无法看到,不知何因?升级后日志系统需要重新配置吗?
>
>
> Thanks!
> Jacob
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: taskmanager.out配置滚动

2020-12-22 文章 zilong xiao
恩恩,这个场景是有的,目前看是可以通过重定向后实现,follow issue~

李杰  于2020年12月22日周二 下午3:58写道:

> Hi,
> 这个功能我们之前做过,可以看下这里。
> https://issues.apache.org/jira/browse/FLINK-20713
>
> zilong xiao  于2020年12月3日周四 下午7:50写道:
>
> > 想问下社区的大佬,标准输出文件taskmanager.out可以配置成滚动的吗?
> >
>


Re: Re: taskmanager.out配置滚动

2020-12-22 文章 zilong xiao
为啥1.11可以呢?

hdxg1101300123  于2020年12月23日周三 下午1:51写道:

> 1.11可以
>
>
>
> 发自vivo智能手机
> > 之前在社区我提过一次redirect的方案,但其他人有一些concerns,可以参考一下
> >
> > https://github.com/apache/flink/pull/11839#pullrequestreview-399769862
> >
> > zilong xiao  于2020年12月22日周二 下午4:13写道:
> >
> > > 恩恩,这个场景是有的,目前看是可以通过重定向后实现,follow issue~
> > >
> > > 李杰  于2020年12月22日周二 下午3:58写道:
> > >
> > > > Hi,
> > > > 这个功能我们之前做过,可以看下这里。
> > > > https://issues.apache.org/jira/browse/FLINK-20713
> > > >
> > > > zilong xiao  于2020年12月3日周四 下午7:50写道:
> > > >
> > > > > 想问下社区的大佬,标准输出文件taskmanager.out可以配置成滚动的吗?
> > > > >
> > > >
> > >
>


Re: SQL从1.9迁移到1.11的问题

2020-11-10 文章 zilong xiao
关于1.11 获取执行计划,我向社区提了一个issue:
https://issues.apache.org/jira/browse/FLINK-19687,我觉得这个应该是需要支持的,可以关注下

izual  于2020年10月30日周五 下午5:04写道:

> hi,Community:
>
>
> 我们目前使用的是 flink 1.9.1 执行 SQL 任务,主要使用了以下几种接口:
> 1. sqlQuery sqlUpdate: 执行表的创建、查找和写入
> 2. toAppendStream/toRetractStream:将表转换为流后,通过 DataStream.addSink(new
> RichSinkFunction )写入
> 3. registerDataStream:将流注册为表,下一步使用 sqlQuery/sqlUpdate 读写该表
>
>
> 最后通过 env.execute() 或者 tableEnv.execute() 执行:通过 RichSinkFunction.invoke 或者
> sqlUpdate(DML) 更新到存储,这两种输出形式都可能多次调用。
>
>
> 看到文档里,这部分接口 [1][2] 的行为有些变化,实际使用1.11后,有几处困惑想请教:
>
>
> 1. 如果预期混用 SQL/DataStream 的接口,我目前按照3里的介绍,使用 sqlUpdate,然后通过 tEnv.execute()
> 来输出。具体的,程序设置两个输出,分别是 RichSinkFunction.invoke 以及 sqlUpdate,观察到只有 sqlUpdate
> 更新了数据,RichSinkFunction 没有执行。如果希望同时输出的话,是必须将 RichSinkFunction.invoke
> 的部分也都实现为 StreamTableSink 么,是否有其他成本较低的迁移方式?如果按照 1.11 区分 env/tableEnv
> 的思路,这种情况怎么实现更加合理?
> 2. 对于这种情况,env.getExecutionPlan 获取的只是调用 DataStream 接口的 DAG 图,如果要获取 Table
> 操作流程的 DAG,应该通过 tableEnv 的哪个接口获取?
>
>
> 1.
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/release-notes/flink-1.11.html#corrected-execution-behavior-of-tableenvironmentexecute-and-streamtableenvironmentexecute-flink-16363
> 2.
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=134745878
> 3.
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#translate-and-execute-a-query
>
>


Re: flink web ui 页面按钮控制

2020-11-18 文章 zilong xiao
Hi  祁洁


*  想问下你是怎么解决的呢?Thanks~*

祁洁 <1241502...@qq.com> 于2020年11月18日周三 下午5:13写道:

> 已解决。
>
>
>
>
> --原始邮件--
> 发件人:
>   "祁洁"
>   <
> 1241502...@qq.com;
> 发送时间:2020年11月18日(星期三) 中午11:01
> 收件人:"user-zh"
> 主题:flink web ui 页面按钮控制
>
>
>
> 请教一下,如何隐藏flink dashboard上的cancel job按钮?


Re: 咨询关于flink究竟使用的是log4j1还是log4j2.

2021-01-25 文章 zilong xiao
Hi

flink从1.11开始应该支持log4j,logback,log4j2了,1.11之前的版本只支持前两者,log4j2也是可以用.properties配置的,现在1.12里的默认配置就是log4j2

祝好~

赵一旦  于2021年1月26日周二 下午1:27写道:

>
> 网上很多人说log4j2是使用.xml配置。但是flink的conf中只有properties,但是官方文档讲默认使用log4j2?搞蒙了,究竟用的哪个呢。
>


flink-sql-gateway相关问题

2021-01-26 文章 zilong xiao
请问有关于flink-sql-gateway rest api以pre job模式提交作业到yarn集群的文档吗?


Re: Flink 新版本的WEB UI taskmanager没有logs

2020-12-16 文章 zilong xiao
看样子slf4j最终是绑定到了logback实现,你的任务配置是用的logback吗?如果不是,需要把logback的依赖排除掉

Jacob <17691150...@163.com> 于2020年12月16日周三 下午5:38写道:

> 谢谢回复!
>
> 1. 在jobmanager.err中发现如下日志绑定,存在冲突。
>
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in
>
> [jar:file:/data/hadoop/dn/sdc/yarn/nm/usercache/***/appcache/application_1603495749855_62368/filecache/24/test_job.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
>
> [jar:file:/data/hadoop/dn/sdc/yarn/nm/usercache/***/appcache/application_1603495749855_62368/filecache/32/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
>
> [jar:file:/opt/cloudera/parcels/CDH-5.8.3-1.cdh5.8.3.p0.2/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
> SLF4J: Actual binding is of type
> [ch.qos.logback.classic.util.ContextSelectorStaticBinder]
>
> 这个多绑定会影响吗?
>
> 2. 该版本使用的配置如下:
>
> env.java.home: /usr/java/jdk1.8.0_162
> yarn.taskmanager.env.JAVA_HOME: /usr/java/jdk1.8.0_162
> containerized.master.env.JAVA_HOME: /usr/java/jdk1.8.0_162
> containerized.taskmanager.env.JAVA_HOME: /usr/java/jdk1.8.0_162
>
>
> jobmanager.rpc.address: localhost
> jobmanager.rpc.port: 6123
> jobmanager.memory.process.size: 3072m
> taskmanager.memory.process.size: 3072m
> taskmanager.numberOfTaskSlots: 4
>
> yarn.application-attempts: 10
> state.backend: filesystem
> state.checkpoints.dir: hdfs://nameservice1/prd/website/flink_checkpoint
> state.savepoints.dir: hdfs://nameservice1/prd/website/flink_checkpoint
> state.backend.incremental: false
> state.backend.fs.memory-threshold: 1024
> state.checkpoints.num-retained: 3
>
> restart-strategy: fixed-delay
> restart-strategy.fixed-delay.attempts: 1000
> restart-strategy.fixed-delay.delay: 30 s
>
> jobmanager.execution.failover-strategy: region
>
>
> classloader.resolve-order: parent-first
>
> 3. job运行方式:on yarn
>
> 4. hadoop版本:2.6
>
> Thanks!
> Jacob
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Flink 1.11 session cluster相关问题

2021-02-01 文章 zilong xiao
请问社区大佬,1.11版本的session
cluster模式不支持在启动时指定启动taskmanager个数了吗?好像只能动态申请资源了?在1.4版本可以用-n,现在该参数已移除,为什么要这么做呢?我理解在启动一个session
cluster的同时申请好TM个数也是一种常见场景吧?

求社区大佬指点


Re: Flink 1.11 session cluster相关问题

2021-02-03 文章 zilong xiao
捞一下自己

zilong xiao  于2021年2月2日周二 上午10:35写道:

> 请问社区大佬,1.11版本的session
> cluster模式不支持在启动时指定启动taskmanager个数了吗?好像只能动态申请资源了?在1.4版本可以用-n,现在该参数已移除,为什么要这么做呢?我理解在启动一个session
> cluster的同时申请好TM个数也是一种常见场景吧?
>
> 求社区大佬指点
>


Re: flink-sql-gateway支持远程吗

2021-01-26 文章 zilong xiao
如果是yarn-pre-job,是如何提交到yarn集群的呢,跟host * rest port应该没关系了吧?

Sebastian Liu  于2021年1月27日周三 上午12:48写道:

> flink-sql-gateway在提交job时,会根据SPI加载executor,然后推断是local mode还是remote mode,
> 在remote mode下,根据FLINK_HOME 目录下config.sh推断的FLINK_CONF_DIR 寻找flink-conf.yaml,
> 其中的host, rest port决定了提交的远端集群
>
> 罗显宴 <15927482...@163.com> 于2021年1月23日周六 上午11:19写道:
>
> >
> >
> 大佬们,我试过flinksql-client和flink-sql-gateway发现都是把代码提交到本地集群运行,好像不能提交到指定远程集群上执行任务,请问大佬们,怎么提交代码到远程集群运行
> >
> >
> > | |
> > 15927482803
> > |
> > |
> > 邮箱:15927482...@163.com
> > |
> >
> > 签名由 网易邮箱大师 定制
>
>
>
> --
>
> *With kind regards
> 
> Sebastian Liu 刘洋
> Institute of Computing Technology, Chinese Academy of Science
> Mobile\WeChat: +86—15201613655
> E-mail: liuyang0...@gmail.com 
> QQ: 3239559*
>


Re: flink-sql-gateway相关问题

2021-01-26 文章 zilong xiao
感谢您的回答,我在flink-conf.yaml里指定"execution.target: yarn-pre-job"后,尝试用rest
api生成session id时会遇到异常,不清楚是为何,可否帮忙看下

flink version: 1.11.3
execution.target: yarn-pre-job
rest api请求路径和参数:
http://localhost:8083/v1/sessions
{
"planner": "blink",
"execution_type": "streaming"
   }

异常信息:Caused by: java.lang.IllegalStateException: No ClusterClientFactory
found. If you were targeting a Yarn cluster, please make sure to export the
HADOOP_CLASSPATH environment variable or have hadoop in your classpath. For
more information refer to the "Deployment & Operations" section of the
official Apache Flink documentation.

Sebastian Liu  于2021年1月27日周三 上午1:01写道:

> sql gateway 提交job方式和flink client类似主要取决于flink-conf中的execution.config 配置,
> 对per job模式on yarn, 对应的配置是“yarn-per-job”,
>
> 这样加载的PipelineExecutor会控制为:YarnJobClusterExecutorFactory,具备向Yarn提交job的能力,这和使用flink
> client
> 提交job最终是一个调用路径,余下在flink-conf中添加上yarn相关配置我理解就可以了。
> org.apache.flink.yarn.configuration.YarnConfigOptions
>
> zilong xiao  于2021年1月26日周二 下午4:00写道:
>
> > 请问有关于flink-sql-gateway rest api以pre job模式提交作业到yarn集群的文档吗?
> >
>
>
> --
>
> *With kind regards
> 
> Sebastian Liu 刘洋
> Institute of Computing Technology, Chinese Academy of Science
> Mobile\WeChat: +86—15201613655
> E-mail: liuyang0...@gmail.com 
> QQ: 3239559*
>


hbase async lookup能否保证输出结果有序?

2021-06-15 文章 zilong xiao
hi,社区大佬们好,想问下flink 1.13中hbase async lookup能否保证输出结果有序?


Re: Flink1.12 用官方默认的log4j2打日志到kafka,如何区分jobmanager和TaskManager的日志?怎么加参数?

2021-06-10 文章 zilong xiao
hi,yidan

   可以在layout中读取系统环境变量作区分

祝好~

yidan zhao  于2021年6月10日周四 下午2:27写道:

> 我觉得还有个头疼的吧,你很多机器,怎么区分每个机器还得。哪个机器的JM/TM的日志。
>
> yujianbo <15205029...@163.com> 于2021年6月10日周四 下午1:48写道:
> >
> > log4j可以,log4j2也可以,现在头疼已经实现打kafka,不知道怎么区分这两边的日志
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: JDBC异步lookup join有FLIP或者迭代计划吗?

2021-06-10 文章 zilong xiao
想贡献+1,关注中...

Ada Luna  于2021年6月10日周四 下午2:42写道:

> 好的后续我会在这个ticket简述方案。
>
> Lin Li  于2021年6月10日周四 下午12:02写道:
> >
> > 社区之前有过基于 legacy source 的 pr
> > https://issues.apache.org/jira/browse/FLINK-14902, 不过目前没有进展, 欢迎贡献!
> > cc Guowei Ma
> >
> >
> > Luna Wong  于2021年6月10日周四 上午11:16写道:
> >
> > > 如果没有我用VertX和Druid连接池贡献下代码。这个要在dev邮件列表里讨论是吗
> > >
>


Re: hbase async lookup能否保证输出结果有序?

2021-06-17 文章 zilong xiao
好的,感谢Jark~

Jark Wu  于2021年6月18日周五 上午10:59写道:

> 可以看下 AsyncWaitOperator 的源码实现。
>
> Best,
> Jark
>
> On Tue, 15 Jun 2021 at 18:53, zilong xiao  wrote:
>
> > 想了解下这块如何保证100%有序的呢,感觉异步查询是不是无法保证查询结果的先后,比如网络原因等等。
> >
> > Jingsong Li  于2021年6月15日周二 下午5:07写道:
> >
> > > 是有序的。
> > >
> > > 无序的mode目前并没有支持, 目前可能会影响流计算的正确性
> > >
> > > Best,
> > > Jingsong
> > >
> > > On Tue, Jun 15, 2021 at 3:42 PM zilong xiao 
> wrote:
> > >
> > > > hi,社区大佬们好,想问下flink 1.13中hbase async lookup能否保证输出结果有序?
> > > >
> > >
> > >
> > > --
> > > Best, Jingsong Lee
> > >
> >
>


Re: hbase async lookup能否保证输出结果有序?

2021-06-15 文章 zilong xiao
想了解下这块如何保证100%有序的呢,感觉异步查询是不是无法保证查询结果的先后,比如网络原因等等。

Jingsong Li  于2021年6月15日周二 下午5:07写道:

> 是有序的。
>
> 无序的mode目前并没有支持, 目前可能会影响流计算的正确性
>
> Best,
> Jingsong
>
> On Tue, Jun 15, 2021 at 3:42 PM zilong xiao  wrote:
>
> > hi,社区大佬们好,想问下flink 1.13中hbase async lookup能否保证输出结果有序?
> >
>
>
> --
> Best, Jingsong Lee
>


Re: Flink 1.11版本LeaseRenewer线程不释放

2021-05-13 文章 zilong xiao
并没有定位到具体原因,只能靠重启作业缓解。。。

zhisheng  于2021年5月13日周四 下午4:20写道:

> 你好,这个问题后来定位到问题了吗?
>
> 我们生产也有一个作业有这样的问题,Flink 版本是 1.10.0,这个作业是 JM 的线程数很多(快 6k),作业是 flink 读取
> Kafka,会关联 HBase ,开启了 Checkpoint,就这个作业有问题,很奇怪
>
> https://tva1.sinaimg.cn/large/008i3skNgy1gqgvhdu674j31je0u0795.jpg
>
> zilong xiao  于2020年12月8日周二 下午6:21写道:
>
> > 作业数据流是 kafka -> flink ->
> > http/prometheus,目前这类型的作业很多,但是就只有那几个有问题,而且是必现,每次都只能重启,然后看着线程数上涨。。
> 我再debug看看~
> >
> > Paul Lam  于2020年12月8日周二 下午6:00写道:
> >
> > > Hi,
> > >
> > > 我之前说的多个集群的情况主要指写入数据到 HDFS。如果只有 checkpoint 依赖 HDFS 而出现这种情况的话,的确是非常奇怪。
> > >
> > > Best,
> > > Paul Lam
> > >
> > > > 2020年12月8日 11:03,zilong xiao  写道:
> > > >
> > > > Hi Paul,
> > > >线程名称是一模一样的,都是user1@cluserA,HDFS client版本对于用户来说是透明的,作业使用的是Flink
> > > >
> > >
> >
> 1.11版本,该Flink版本使用HDFS版本好像是2.8.1,在Flink中和集群有持续交互的就只能想到checkpoint,开了DEBUG日志也没能找到root
> > > > cause。。
> > > >
> > > >另外 您说的“线程个数应该和用到的 HDFS 集群数目相同”不是很理解,作业只能提交到一个具体的集群吧?
> > > >
> > > > Paul Lam  于2020年12月8日周二 上午10:45写道:
> > > >
> > > >> 我记得 LeaseRenewer 是 JVM 级别的,线程个数应该和用到的 HDFS 集群数目相同。
> > > >>
> > > >> 你看看它们具体的线程名是不是完全相同(比如都是 user1@cluserA)?还有 HDFS client 的版本是什么?
> > > >>
> > > >> Best,
> > > >> Paul Lam
> > > >>
> > > >>> 2020年12月7日 18:11,zilong xiao  写道:
> > > >>>
> > > >>> 在生产中发现有个别Flink SQL 1.11作业的container线程数很高,查看Thread
> > > Dump发现有很多名为LeaseRenewer
> > > >>> 的线程处于TIMED_WAITING状态,目前只能复现其现象,但是无法定位原因,不知道社区是否有类似经历的小伙伴呢?
> > > >>>
> > > >>> Flink version: 1.11
> > > >>> State backend:filesystem
> > > >>> checkpoint interval: 60s
> > > >>
> > > >>
> > >
> > >
> >
>


Re: flink1.12版本,yarn-application模式Flink web ui看不到日志

2021-06-03 文章 zilong xiao
1.10默认用的log4j1,1.12用log4j2

smq <374060...@qq.com> 于2021年6月2日周三 下午3:26写道:

>
> 你的意思是在log4j.properties中的配置吗,我门在这个里边配置了生成日志文件的格式,是在安装节点里加的,不过这个应该不是在webui里显示的。奇怪的一点是我们组有别的程序是正常的,但是一部分在webUI不显示日志。我们目前是从1.10升级到1.12,这种情况在1.12出现的
>
>
>
>
>
> -- 原始邮件 --
> 发件人: r pp  发送时间: 2021年6月2日 15:08
> 收件人: user-zh  主题: 回复:flink1.12版本,yarn-application模式Flink web ui看不到日志
>
>
>
> 嗨~ 你们有没有改日志文件的名字
>
> smq <374060...@qq.com 于2021年6月2日周三 下午12:24写道:
>
>  你这个解决了吗,我也遇到了同样的问题
> 
> 
> 
> 
> 
>  -- 原始邮件 --
>  发件人: todd   发送时间: 2021年4月14日 19:11
>  收件人: user-zh   主题: 回复:flink1.12版本,yarn-application模式Flink web ui看不到日志
> 
> 
> 
>  yarn上只有.out,.error的日志信息,但是从flink web ui的log框,无法显示日志内容。
> 
> 
> 
>  --
>  Sent from: http://apache-flink.147419.n8.nabble.com/
>
>
>
> --
> Best,
>  pp


Re: flink-sql-gateway相关问题

2021-01-27 文章 zilong xiao
好的

Lin Li  于2021年1月27日周三 下午5:20写道:

> try:  "execution.target: yarn-pre-job"  ->  "execution.target:
> yarn-per-job"
>
> zilong xiao  于2021年1月27日周三 上午10:17写道:
>
> > 感谢您的回答,我在flink-conf.yaml里指定"execution.target: yarn-pre-job"后,尝试用rest
> > api生成session id时会遇到异常,不清楚是为何,可否帮忙看下
> >
> > flink version: 1.11.3
> > execution.target: yarn-pre-job
> > rest api请求路径和参数:
> > http://localhost:8083/v1/sessions
> > {
> > "planner": "blink",
> > "execution_type": "streaming"
> >}
> >
> > 异常信息:Caused by: java.lang.IllegalStateException: No ClusterClientFactory
> > found. If you were targeting a Yarn cluster, please make sure to export
> the
> > HADOOP_CLASSPATH environment variable or have hadoop in your classpath.
> For
> > more information refer to the "Deployment & Operations" section of the
> > official Apache Flink documentation.
> >
> > Sebastian Liu  于2021年1月27日周三 上午1:01写道:
> >
> > > sql gateway 提交job方式和flink client类似主要取决于flink-conf中的execution.config 配置,
> > > 对per job模式on yarn, 对应的配置是“yarn-per-job”,
> > >
> > >
> >
> 这样加载的PipelineExecutor会控制为:YarnJobClusterExecutorFactory,具备向Yarn提交job的能力,这和使用flink
> > > client
> > > 提交job最终是一个调用路径,余下在flink-conf中添加上yarn相关配置我理解就可以了。
> > > org.apache.flink.yarn.configuration.YarnConfigOptions
> > >
> > > zilong xiao  于2021年1月26日周二 下午4:00写道:
> > >
> > > > 请问有关于flink-sql-gateway rest api以pre job模式提交作业到yarn集群的文档吗?
> > > >
> > >
> > >
> > > --
> > >
> > > *With kind regards
> > > 
> > > Sebastian Liu 刘洋
> > > Institute of Computing Technology, Chinese Academy of Science
> > > Mobile\WeChat: +86—15201613655
> > > E-mail: liuyang0...@gmail.com 
> > > QQ: 3239559*
> > >
> >
>


Re: Re: flink-sql-gateway稳定性如何,可以在生产环境使用吗?

2021-01-27 文章 zilong xiao
session-client 作用是什么呢? session的维护和管理吗?

felixzh  于2021年1月27日周三 下午5:49写道:

> 如果使用flink-sql-gateway,建议自己参照jdbc封装一个session-client
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2021-01-27 14:41:28,"Jeff Zhang"  写道:
> >zeppelin 有 rest api 接口,
> https://www.yuque.com/jeffzhangjianfeng/gldg8w/pz2xoh
> >
> >jinsx  于2021年1月27日周三 下午2:30写道:
> >
> >> 如果使用zeppelin,zeppelin可以提供rpc接口吗
> >>
> >>
> >>
> >> --
> >> Sent from: http://apache-flink.147419.n8.nabble.com/
> >>
> >
> >
> >--
> >Best Regards
> >
> >Jeff Zhang
>


Re: 关于1.12新增的initialize阶段时间较长问题

2021-02-06 文章 zilong xiao
有截图吗?

赵一旦  于2021年2月7日周日 下午3:13写道:

> 这个问题现在还有个现象,我提交任务,web
> UI就类似卡住状态,过一会刷新出来任务,会有4-5个initialize状态的任务,然后几秒之内陆续消失,剩下1个。
>
> 目前怀疑是有什么重试机制,导致重复提交N个任务,然后可能还有什么去重机制,然后其中几个陆续自动停掉?
>
> 赵一旦  于2021年1月26日周二 上午10:51写道:
>
> > 如上,目前发现以前很快(10-30s)内能从敲命名到running的任务。现在有时候innitialize阶段就得1-2min。不清楚啥情况。
> >
>


  1   2   >