flink输出流类型

2020-11-02 文章 Dream-
hi、
请问下面这两种输出流实现的时候,怎么确定选用实现upsert还是retract,因为看这两个api描述功能是差不多的,并且代码实现只是upsert多了两个方法,具体完成的功能貌似可以是一样的,这要怎么作出选择:



public interface RetractStreamTableSink
extends StreamTableSink
https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/api/java/tuple/Tuple2.html>http://docs.oracle.com/javase/7/docs/api/java/lang/Boolean.html?is-external=true>,T>>

Defines an external TableSink

to
emit a streaming Table

with
insert, update, and delete changes.

The table will be converted into a stream of accumulate and retraction
messages which are encoded as Tuple2
.
The first field is a Boolean

flag
to indicate the message type. The second field holds the record of the
requested type T.

A message with true Boolean

flag
is an accumulate (or add) message.

A message with false flag is a retract message.


public interface UpsertStreamTableSink
extends StreamTableSink
https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/api/java/tuple/Tuple2.html>http://docs.oracle.com/javase/7/docs/api/java/lang/Boolean.html?is-external=true>,T>>

Defines an external TableSink

to
emit a streaming Table

with
insert, update, and delete changes. The Table

must
be have unique key fields (atomic or composite) or be append-only.

If the Table

does
not have a unique key and is not append-only, a TableException

will
be thrown.

The unique key of the table is configured by the setKeyFields(String[])

 method.

The Table

will
be converted into a stream of upsert and delete messages which are encoded
as Tuple2
.
The first field is a Boolean

flag
to indicate the message type. The second field holds the record of the
requested type T.

A message with true Boolean

field
is an upsert message for the configured key.

A message with false flag is a delete message for the configured key.

If the table is append-only, all messages will have a true flag and must be
interpreted as insertions.


Re: flink1.11 kafka connector

2020-10-29 文章 Dream-
hi、
好的,https://issues.apache.org/jira/browse/FLINK-19871

Jark Wu  于2020年10月29日周四 下午12:06写道:

> 目前还不支持,可以去社区开个 issue,看能不能赶上1.12
>
> Best,
> Jark
>
>
> On Thu, 29 Oct 2020 at 11:26, Dream-底限  wrote:
>
> > hi、
> > 我看了一下官方提供的kafka sink,对于数据发送方式为两种:对于第二种情况,有办法保证对于指定主键的变化过程发送到同一个kafka
> > partiiton吗,或者说社区对于原生hash(key)到kafka分区映射的方式有支持计划吗
> >
> >- fixed:每个Flink分区最多只能有一个Kafka分区。
> >- round-robin:Flink分区循环分配给Kafka分区。
> >
>


Re: Re: flink table转datastream失败

2020-10-20 文章 Dream-
hi、
是的,类型部分不匹配,类型改完之后程序运行正常了,感谢

hailongwang <18868816...@163.com> 于2020年10月20日周二 下午4:13写道:

> Hi,
>  我看其中一个 condition 是  `t1.uid = t2.refer_id`
> 其中 uid 是 bigint 类型,refer_id 是 varchar 类型。
> 你再确认下?
>
>
> Best,
> Hailong Wang
>
>
>
>
> At 2020-10-20 08:55:34, "Dream-底限"  wrote:
> >hi、
> >我查看了一下,join条件类型是一样的,我这面下游sink使用jdbc时候是可以运行的,但是转换为datastream时候失败了,下面是程序及异常:
> >
> >streamTableEnv.executeSql(kafkaDDL);//ddl语句见下面日志
> >
> >
> >Table table = streamTableEnv.sqlQuery("SELECT cast(t1.id as bigint) as
> >register_id,cast(t1.uid as bigint) as asi_uid,cast(null as bigint) as
> >person_uuid,cast(t1.app_id as bigint) as app_id,cast(t1.country_id as
> >bigint) as country_id,t2.channel_id as channel_id,t2.device_id as
> >device_id,t2.adjust_id as adjust_id,t2.google_adid as
> >google_adid,cast(t3.referrer as bigint) as referrer,t3.login_pwd as
> >login_pwd,cast(t1.sync_data_flag as int) as
> >sync_data_flag,t3.phone_number as
> >register_phone_number,cast(t2.device_type as int) as
> >device_type,t2.imei as imei,t2.device_model as
> >device_model,t2.os_version as os_version,t2.app_name as
> >app_name,t2.app_version as app_version,t2.app_package_name as
> >app_package_name,cast(t2.network_type as string) as
> >network_type,t2.wifi_mac as wifi_mac,t2.lgt as longitude,t2.lat as
> >latitude,cast(null as string) as geo_hash7,t2.ip as
> >ip,unix_timestamp(t1.create_time,'-MM-dd HH:mm:ss') as
> >register_time,UNIX_TIMESTAMP() as etl_time from (SELECT
>
> >`uid`,`update_time`,`adjust_id`,`create_time`,`source_type`,`sync_data_flag`,`id`,`app_id`,`country_id`
> >FROM (SELECT
> `rowData`.`uid`,`rowData`.`update_time`,`rowData`.`adjust_id`,`rowData`.`create_time`,`rowData`.`source_type`,`rowData`.`sync_data_flag`,`rowData`.`id`,`rowData`.`app_id`,`rowData`.`country_id`,`binlogTime`,ROW_NUMBER()
> >OVER (PARTITION BY `rowData`.`uid`,`rowData`.`id` ORDER BY
> >`binlogTime` desc) AS rownum FROM
> >asi_user_user_service_t_user_register_source) WHERE rownum = 1) t1
> >left join  (SELECT
>
> >`refer_id`,`device_id`,`channel_id`,`device_type`,`imei`,`adjust_id`,`google_adid`,`device_model`,`os_version`,`app_name`,`app_package_name`,`app_version`,`ip`,`network_type`,`wifi_mac`,`lgt`,`lat`,`event_id`,`country_id`,`uid`,`create_time`,`api_version`
> >FROM (SELECT
> `data`.`data`.`refer_id`,`data`.`data`.`device_id`,`data`.`data`.`channel_id`,`data`.`data`.`device_type`,`data`.`data`.`imei`,`data`.`data`.`adjust_id`,`data`.`data`.`google_adid`,`data`.`data`.`device_model`,`data`.`data`.`os_version`,`data`.`data`.`app_name`,`data`.`data`.`app_package_name`,`data`.`data`.`app_version`,`data`.`data`.`ip`,`data`.`data`.`network_type`,`data`.`data`.`wifi_mac`,`data`.`data`.`lgt`,`data`.`data`.`lat`,`data`.`event_id`,`data`.`country_id`,`data`.`uid`,`data`.`create_time`,`data`.`api_version`,ROW_NUMBER()
> >OVER (PARTITION BY `data`.`data`.`refer_id`,`data`.`event_id` ORDER BY
> >createTime desc) AS rownum FROM eventDeviceInfo where
> >`data`.`event_id`=1002) WHERE rownum = 1) t2 on t1.uid = t2.refer_id
> >left join (SELECT
>
> >`register_channel_source`,`last_login_time`,`create_time`,`language`,`avatar`,`login_pwd`,`email_status`,`storage_source`,`uid`,`referrer`,`update_time`,`nickname`,`phone_number`,`sync_data_flag`,`id`,`country_id`,`email`,`status`
> >FROM (SELECT
> `rowData`.`register_channel_source`,`rowData`.`last_login_time`,`rowData`.`create_time`,`rowData`.`language`,`rowData`.`avatar`,`rowData`.`login_pwd`,`rowData`.`email_status`,`rowData`.`storage_source`,`rowData`.`uid`,`rowData`.`referrer`,`rowData`.`update_time`,`rowData`.`nickname`,`rowData`.`phone_number`,`rowData`.`sync_data_flag`,`rowData`.`id`,`rowData`.`country_id`,`rowData`.`email`,`rowData`.`status`,`binlogTime`,ROW_NUMBER()
> >OVER (PARTITION BY `rowData`.`uid` ORDER BY `binlogTime` desc) AS
> >rownum FROM asi_user_user_service_t_user) WHERE rownum = 1) t3 on
> >t1.uid=t3.uid");
> >
> >table.printSchema();
> >streamTableEnv.toRetractStream(table,
>
> >Types.ROW(TypeConversions.fromDataTypeToLegacyInfo(table.getSchema().getFieldDataTypes(.print();
> >streamExecEnv.execute("kafka-json-test");
> >
> >
> >
> >
> >CREATE TABLE eventDeviceInfo (`data` ROW<`event_id`
> >BIGINT,`country_id` BIGINT,`uid` BIGINT,`create_time` BIGINT,`data`
> >ROW<`refer_id` STRING,`device_id` STRING,`channel_id`
> >STRING,`device_type` BIGINT,`imei` STRING,`adjust_id`
> >STRING,`google_adid` STRING,`device_model` STRING,`os_version`
> >STRING,`app_name` STRING,`app_package_name` STRING,`app_version`
> >STRING,`ip` STRING,`network_type` BIGINT,`wifi

Re: flink table转datastream失败

2020-10-19 文章 Dream-
nce$class.foldLeft(TraversableOnce.scala:157)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:164)
at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:80)
at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279)
at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)
at 
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:331)
at 
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.java:307)
at 
com.akulaku.blade.source.BladeKafkaJoinCodeTest.eventDeviceInfo(BladeKafkaJoinCodeTest.java:265)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at 
com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at 
com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)


Process finished with exit code 255


hailongwang <18868816...@163.com> 于2020年10月19日周一 下午8:35写道:

> Hi Dream,
> 可以分享下你完整的程序吗,我感觉这个是因为 JOIN ON 条件上类型不一致引起的,可以分享下你完整的程序看下。
> Best,
> Hailong Wang
>
> 在 2020-10-19 09:50:33,"Dream-底限"  写道:
> >hi、我这面正在将flinktable转换为datastream,现在抛出如下异常,貌似是个bug。。。
> >
> >table.printSchema();
> >streamTableEnv.toRetractStream(table,
>
> >Types.ROW(TypeConversions.fromDataTypeToLegacyInfo(table.getSchema().getFieldDataTypes(.print();
> >
> >
> >
> >root
> > |-- register_id: BIGINT
> > |-- asi_uid: BIGINT
> > |-- person_uuid: BIGINT
> > |-- app_id: BIGINT
> > |-- country_id:

flink table转datastream失败

2020-10-18 文章 Dream-
hi、我这面正在将flinktable转换为datastream,现在抛出如下异常,貌似是个bug。。。

table.printSchema();
streamTableEnv.toRetractStream(table,
Types.ROW(TypeConversions.fromDataTypeToLegacyInfo(table.getSchema().getFieldDataTypes(.print();



root
 |-- register_id: BIGINT
 |-- asi_uid: BIGINT
 |-- person_uuid: BIGINT
 |-- app_id: BIGINT
 |-- country_id: BIGINT
 |-- channel_id: STRING
 |-- device_id: STRING
 |-- adjust_id: STRING
 |-- google_adid: STRING
 |-- referrer: BIGINT
 |-- login_pwd: STRING
 |-- sync_data_flag: INT
 |-- register_phone_number: STRING
 |-- device_type: INT
 |-- imei: STRING
 |-- device_model: STRING
 |-- os_version: STRING
 |-- app_name: STRING
 |-- app_version: STRING
 |-- app_package_name: STRING
 |-- network_type: STRING
 |-- wifi_mac: STRING
 |-- longitude: DECIMAL(38, 18)
 |-- latitude: DECIMAL(38, 18)
 |-- geo_hash7: STRING
 |-- ip: STRING
 |-- register_time: BIGINT
 |-- etl_time: BIGINT NOT NULL


org.apache.flink.table.api.TableException: BIGINT and
VARCHAR(2147483647) does not have common type now


flink sql添加 null值 字段

2020-10-14 文章 Dream-
hi、
我现在使用flink sql完成如下sql语句,但是程序无法运行,请问这个功能要怎么实现:
select null as person_uuid from tablename
抛出异常:
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: SQL validation failed. From line 1, column 47 to line 1,
column 50: Illegal use of 'NULL'
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: org.apache.flink.table.api.ValidationException: SQL validation
failed. From line 1, column 47 to line 1, column 50: Illegal use of 'NULL'
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:187)
at
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:664)
at com.akulaku.blade.main.StreamingMain.main(StreamingMain.java:103)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
... 11 more
Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1,
column 47 to line 1, column 50: Illegal use of 'NULL'
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at
org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.inferUnknownTypes(SqlValidatorImpl.java:1906)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.inferUnknownTypes(SqlValidatorImpl.java:1968)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:479)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4255)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3523)
at
org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
at
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141)
... 21 more
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Illegal
use of 'NULL'
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at
org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
at 

Re: flink点查时态表支持子查询

2020-10-14 文章 Dream-
hi、
我想到是一个实现方案是在flink端ddl建立lookup表的时候,一张flink表对应上面说的那个外部子查询虚拟表,相当于flink建了一个视图吧

Dream-底限  于2020年10月14日周三 下午2:23写道:

> hi、
>
> 》》你说的点查sql子表可以节省开销,不是很理解,是指同一个key关联多张维表,然后查询外部系统时一个key带出多个表的数据吗?这个应该和目前flink的实现机制不太一致。
> 是的,可以理解为用一个key查询一个视图,这个视图来自于多表关联;在不做视图的情况下,直接点查外部系统的子查询,在flink端依然是原查询样式
> 依然是:JOIN table2 FOR SYSTEM_TIME AS OF 
> table1.proctime,只不过table2不再是一个物理实表,如:table2=(select
> col from table)
>
> Leonard Xu  于2020年10月13日周二 下午8:50写道:
>
>> Hi,
>> 我理解网络开销更多来自于当前的lookup实现每次都需要访问外部系统,如果能做一些cache机制,这样能省掉较多的开销。
>>
>> 你说的点查sql子表可以节省开销,不是很理解,是指同一个key关联多张维表,然后查询外部系统时一个key带出多个表的数据吗?这个应该和目前flink的实现机制不太一致。
>>
>>
>> > 在 2020年10月13日,10:03,Dream-底限  写道:
>> >
>> > hi、
>> >
>> 现在流表查询外部维表的时候,在有多张维表的情况下会多次查询外部系统,这就导致多次网络请求回传,社区后续会不会支持时态表子查询,就是根据指定的key查询外部系统的时候不再是一次查询一个指定的表,可以点查一个sql子表,这样网络io会小一些
>>
>>


Re: flink点查时态表支持子查询

2020-10-14 文章 Dream-
hi、
》》你说的点查sql子表可以节省开销,不是很理解,是指同一个key关联多张维表,然后查询外部系统时一个key带出多个表的数据吗?这个应该和目前flink的实现机制不太一致。
是的,可以理解为用一个key查询一个视图,这个视图来自于多表关联;在不做视图的情况下,直接点查外部系统的子查询,在flink端依然是原查询样式 依然是:
JOIN table2 FOR SYSTEM_TIME AS OF
table1.proctime,只不过table2不再是一个物理实表,如:table2=(select
col from table)

Leonard Xu  于2020年10月13日周二 下午8:50写道:

> Hi,
> 我理解网络开销更多来自于当前的lookup实现每次都需要访问外部系统,如果能做一些cache机制,这样能省掉较多的开销。
>
> 你说的点查sql子表可以节省开销,不是很理解,是指同一个key关联多张维表,然后查询外部系统时一个key带出多个表的数据吗?这个应该和目前flink的实现机制不太一致。
>
>
> > 在 2020年10月13日,10:03,Dream-底限  写道:
> >
> > hi、
> >
> 现在流表查询外部维表的时候,在有多张维表的情况下会多次查询外部系统,这就导致多次网络请求回传,社区后续会不会支持时态表子查询,就是根据指定的key查询外部系统的时候不再是一次查询一个指定的表,可以点查一个sql子表,这样网络io会小一些
>
>


Re: flink on yarn容器异常退出

2020-10-13 文章 Dream-
hi
先前我是一个container申请四个core,经常遇到分配完applicationid然后任务处于standby后就挂掉了,现在申请四个container,每个container一个core后正常启动任务了

Congxian Qiu  于2020年10月13日周二 下午1:12写道:

> Hi
> 容易异常退出是指 container 退出吗?可以看下 JM/TM log 是否有相应信息,如果没有,可以尝试从 yarn 侧看下日志为什么
> container 退出了
> Best,
> Congxian
>
>
> caozhen  于2020年10月12日周一 下午6:08写道:
>
> >
> > 可以发下 "分配完applicationid后,容器经常异常退出"  产生的错误日志吗?
> >
> > 或者排查下flink客户端中的错误日志,以及yarn-historyserver里的日志。
> >
> > 
> >
> > Dream-底限 wrote
> > > hi
> > > 我正在使用flink1.11.1 on
> > >
> >
> yarn以分离模式运行任务,但在任务提交的时候,任务在分配完applicationid后,容器经常异常退出,先前以为是yarn环境问题,但是在两个集群测都有遇到这种情况,请问这是一个已知的bug吗
> >
> >
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
>


flink点查时态表支持子查询

2020-10-12 文章 Dream-
hi、
现在流表查询外部维表的时候,在有多张维表的情况下会多次查询外部系统,这就导致多次网络请求回传,社区后续会不会支持时态表子查询,就是根据指定的key查询外部系统的时候不再是一次查询一个指定的表,可以点查一个sql子表,这样网络io会小一些


Re: Re:Re: Re: Flink 1.11.1 on k8s 如何配置hadoop

2020-10-11 文章 Dream-
hi、可以去hadoop的一个节点直接打镜像哈,打镜像的时候把需要的hadoop依赖包、flink一起打包到docker里面,然后配置一下环境变量就可以用了;如果你的docker部署节点有hadoop或flink也可以直接外挂;目前我们使用的是第一种

Yang Wang  于2020年10月12日周一 上午10:23写道:

> 只需要base社区的镜像,然后再加上一层(拷贝flink-shaded-hadoop),commit到docker
> image,然后push到docker registry就可以了
>
> 例如Dockerfile可以如下
> FROM flink:1.11.1-scala_2.11
> COPY flink-shaded-hadoop-2*.jar /opt/flink/lib/
>
> 另外,flink-shaded-hadoop可以从这里下载[1]
>
> [1].
> https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-2
>
>
> Best,
> Yang
>
> yang  于2020年10月10日周六 下午5:50写道:
>
> > 麻烦问一下,从新打镜像,是把原来的包解压然后从新打包么
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Flink 1.11.1 输出Hbase,除rowkey以外全部为null

2020-10-10 文章 Dream-
hi、phoenix文档中数据类型部分有描述兼容现有hbase类型的类型,你可以看一下,就是名字有些区别

TonyChen  于2020年10月10日周六 下午5:02写道:

> 好的,我理解您的意思是:
> 1,目前我的数据已经写到hbase中,只是phoenix在获取数据的时候反序列化存在问题。
>
> scan 'ods_iot_gasdevice'
> ROW COLUMN+CELL
>  2900   column=base_info:device_id,
> timestamp=1602319974755, value=\x00\x00\x0BT
>  2900   column=base_info:verify_code,
> timestamp=1602319974755, value=6682acbe69c1a14
>  2900   column=status_info:battery_power,
> timestamp=1602319974755, value=\x00\x00\x00=
>  2900   column=status_info:device_status,
> timestamp=1602319974755, value=\x00\x00\x00\x02
>  2900
>  column=time_characteristics:create_time, timestamp=1602319974755,
> value=\x00\x00\x01u\x13m\xC9D
>
> 2. 我改用hbase shell建库。
>  create 'ods_iot_gasdevice', 'base_info', 'status_info',
> ‘time_characteristics'
> 然后flink写入数据,最后在建立phoenix映射。
> CREATE TABLE IF NOT EXISTS "ods_iot_gasdevice"(
> "rowkey" varchar not null primary key,
> "base_info"."device_id" BIGINT,
> "base_info"."verify_code" varchar,
> "status_info"."battery_power" BIGINT,
> "status_info"."device_status" BIGINT,
> "time_characteristics"."create_time" varchar
> );
> 在这样操作之后,phoenix获取数据仍是之前的现象。我已经用了最基本的数据类型。怎么判定哪些数据类型phoenix类型和hbase类型是兼容的?
>
> 谢谢
>
> Best,
> TonyChen
>
> > 2020年10月10日 下午4:31,Dream-底限  写道:
> >
> > hi、phoenix类型和hbase类型有一部分不兼容:1、使用phoenix建表并通过phoenix jdbc插入数据;2、使用hbase
> > api建表并插入数据,然后phoenix映射到现有hbase表;现在是数据通过hbase
> > api存到hbase了,但phoenix反序列化读取时候有问题,或者你建phoenix表的时候数据类型选择兼容hbase原始类型的类型
> >
> >
> > TonyChen  于2020年10月10日周六 下午4:25写道:
> >
> >> 我这个表就是用 phoenix 创建的。。
> >>
> >>
> >> Best,
> >> TonyChen
> >>
> >>> 2020年10月10日 下午4:11,Dream-底限  写道:
> >>>
> >>> hi、phoenix要映射现有hbase表,改一下phoenix建标语句应该可以解决
> >>>
> >>> TonyChen  于2020年10月10日周六 下午4:03写道:
> >>>
> >>>> 查了下
> >>>>
> >>>> hbase(main):011:0> get 'ods_iot_gasdevice', '2900'
> >>>> COLUMN  CELL
> >>>> base_info:device_idtimestamp=1602314819543,
> >>>> value=\x00\x00\x0BT
> >>>> base_info:verify_code  timestamp=1602314819543,
> >>>> value=54976e8caf8f524
> >>>> status_info:battery_power  timestamp=1602314819543,
> >>>> value=\x00\x00\x00T
> >>>> status_info:device_status  timestamp=1602314819543,
> >>>> value=\x00\x00\x00\x02
> >>>> time_characteristics:create_time   timestamp=1602314819543,
> >>>> value=\x00\x00\x01u\x13\x1F\x1Fv
> >>>> 1 row(s) in 0.1950 seconds
> >>>>
> >>>>
> >>>> Best,
> >>>> TonyChen
> >>>>
> >>>>> 2020年10月10日 下午3:51,Jark Wu  写道:
> >>>>>
> >>>>> 我看你是在 phoenix 中查询的,有试过在 hbase shell 中查询吗?
> >>>>>
> >>>>> On Sat, 10 Oct 2020 at 15:36, TonyChen  wrote:
> >>>>>
> >>>>>> 补充下:print输出数据正常
> >>>>>>
> >>>>>> 11> +I(2910,2910,c514eecbc5c4748,9,1,2020-10-10T15:26:48.182)
> >>>>>> 12> +I(2911,2911,0d1508ec452d27e,68,4,2020-10-10T15:26:48.182)
> >>>>>> 3> +I(2902,2902,54976e60b7e8f54,83,1,2020-10-10T15:26:48.182)
> >>>>>> 5> +I(2904,2904,72806e351aa5849,75,1,2020-10-10T15:26:48.182)
> >>>>>> 8> +I(2907,2907,1f17c8000abc9e1,38,2,2020-10-10T15:26:48.182)
> >>>>>> 1> +I(2900,2900,54976e8caf8f524,84,2,2020-10-10T15:26:48.182)
> >>>>>> 6> +I(2905,2905,70852c245fbd949,45,3,2020-10-10T15:26:48.182)
> >>>>>> 10> +I(2909,2909,22854c002e08c80,48,2,2020-10-10T15:26:48.182)
> >>>>>> 2> +I(2901,2901,4110ae7bfd85814,58,2,2020-10-10T15:26:48.182)
> >>>>>> 9> +I(2908,2908,e5074eeedcaea14,46,3,2020-10-10T15:26:48.182)
> >>>>>> 4> +I(2903,2903,cc936a01758c0f3,6,4,2020-10-10T15:26:48.182)
> >>>>>> 7> +I(2906,2906,30864c59367e180,50,4,2020-10-10T15:26:48.182)
> >>>>>> 7> +I(2918,2918,90bbe807fd5,7,4,2020-10-10T15:26:49.178)
> >>>>>> 9> +I(2920,2920,7e2f257406c730f,54,4,2020-10-10T15:26:49.178)
> >>>>>

Re: Flink 1.11.1 输出Hbase,除rowkey以外全部为null

2020-10-10 文章 Dream-
hi、phoenix类型和hbase类型有一部分不兼容:1、使用phoenix建表并通过phoenix jdbc插入数据;2、使用hbase
api建表并插入数据,然后phoenix映射到现有hbase表;现在是数据通过hbase
api存到hbase了,但phoenix反序列化读取时候有问题,或者你建phoenix表的时候数据类型选择兼容hbase原始类型的类型


TonyChen  于2020年10月10日周六 下午4:25写道:

> 我这个表就是用 phoenix 创建的。。
>
>
> Best,
> TonyChen
>
> > 2020年10月10日 下午4:11,Dream-底限  写道:
> >
> > hi、phoenix要映射现有hbase表,改一下phoenix建标语句应该可以解决
> >
> > TonyChen  于2020年10月10日周六 下午4:03写道:
> >
> >> 查了下
> >>
> >> hbase(main):011:0> get 'ods_iot_gasdevice', '2900'
> >> COLUMN  CELL
> >> base_info:device_idtimestamp=1602314819543,
> >> value=\x00\x00\x0BT
> >> base_info:verify_code  timestamp=1602314819543,
> >> value=54976e8caf8f524
> >> status_info:battery_power  timestamp=1602314819543,
> >> value=\x00\x00\x00T
> >> status_info:device_status  timestamp=1602314819543,
> >> value=\x00\x00\x00\x02
> >> time_characteristics:create_time   timestamp=1602314819543,
> >> value=\x00\x00\x01u\x13\x1F\x1Fv
> >> 1 row(s) in 0.1950 seconds
> >>
> >>
> >> Best,
> >> TonyChen
> >>
> >>> 2020年10月10日 下午3:51,Jark Wu  写道:
> >>>
> >>> 我看你是在 phoenix 中查询的,有试过在 hbase shell 中查询吗?
> >>>
> >>> On Sat, 10 Oct 2020 at 15:36, TonyChen  wrote:
> >>>
> >>>> 补充下:print输出数据正常
> >>>>
> >>>> 11> +I(2910,2910,c514eecbc5c4748,9,1,2020-10-10T15:26:48.182)
> >>>> 12> +I(2911,2911,0d1508ec452d27e,68,4,2020-10-10T15:26:48.182)
> >>>> 3> +I(2902,2902,54976e60b7e8f54,83,1,2020-10-10T15:26:48.182)
> >>>> 5> +I(2904,2904,72806e351aa5849,75,1,2020-10-10T15:26:48.182)
> >>>> 8> +I(2907,2907,1f17c8000abc9e1,38,2,2020-10-10T15:26:48.182)
> >>>> 1> +I(2900,2900,54976e8caf8f524,84,2,2020-10-10T15:26:48.182)
> >>>> 6> +I(2905,2905,70852c245fbd949,45,3,2020-10-10T15:26:48.182)
> >>>> 10> +I(2909,2909,22854c002e08c80,48,2,2020-10-10T15:26:48.182)
> >>>> 2> +I(2901,2901,4110ae7bfd85814,58,2,2020-10-10T15:26:48.182)
> >>>> 9> +I(2908,2908,e5074eeedcaea14,46,3,2020-10-10T15:26:48.182)
> >>>> 4> +I(2903,2903,cc936a01758c0f3,6,4,2020-10-10T15:26:48.182)
> >>>> 7> +I(2906,2906,30864c59367e180,50,4,2020-10-10T15:26:48.182)
> >>>> 7> +I(2918,2918,90bbe807fd5,7,4,2020-10-10T15:26:49.178)
> >>>> 9> +I(2920,2920,7e2f257406c730f,54,4,2020-10-10T15:26:49.178)
> >>>> 4> +I(2915,2915,b4f2549ea4436b7,60,2,2020-10-10T15:26:49.178)
> >>>> 11> +I(2922,2922,d7bfa14a1a9555f,6,1,2020-10-10T15:26:49.178)
> >>>> 6> +I(2917,2917,9515c66bb40527b,78,1,2020-10-10T15:26:49.178)
> >>>> 12> +I(2923,2923,8f4535a229b3a32,1,1,2020-10-10T15:26:49.178)
> >>>> 5> +I(2916,2916,093037cdc5acec9,50,4,2020-10-10T15:26:49.178)
> >>>> 2> +I(2913,2913,2842267351e9487,55,2,2020-10-10T15:26:49.178)
> >>>> 10> +I(2921,2921,7704923d1f5042b,33,4,2020-10-10T15:26:49.178)
> >>>> 1> +I(2912,2912,eba073be1b6f4b2,72,2,2020-10-10T15:26:49.178)
> >>>> 8> +I(2919,2919,dd4ae09e50859b9,3,3,2020-10-10T15:26:49.178)
> >>>> 3> +I(2914,2914,49655f1ca53e26e,24,2,2020-10-10T15:26:49.178)
> >>>> 11> +I(2934,2934,3a00563a0ecaf6b,57,2,2020-10-10T15:26:50.173)
> >>>> 9> +I(2932,2932,9a67d05622a57ae,54,1,2020-10-10T15:26:50.173)
> >>>> 4> +I(2927,2927,c6db9148eeddd51,26,1,2020-10-10T15:26:50.173)
> >>>> 5> +I(2928,2928,ae06b971bdeaff4,35,1,2020-10-10T15:26:50.173)
> >>>> 1> +I(2924,2924,176b97cbb37b8e6,62,1,2020-10-10T15:26:50.173)
> >>>> 6> +I(2929,2929,115d2d3f33188a6,62,2,2020-10-10T15:26:50.173)
> >>>> 2> +I(2925,2925,3f3db2f63e505c7,86,4,2020-10-10T15:26:50.173)
> >>>> 3> +I(2926,2926,d88f38f093f274d,55,1,2020-10-10T15:26:50.173)
> >>>> 12> +I(2935,2935,56d5fbf93fd515d,72,1,2020-10-10T15:26:50.173)
> >>>> 8> +I(2931,2931,dd98a2999028a83,59,1,2020-10-10T15:26:50.173)
> >>>> 10> +I(2933,2933,377ec5fa4e52ca3,28,1,2020-10-10T15:26:50.173)
> >>>> 7> +I(2930,2930,d2e93ff01ba5d2b,18,1,2020-10-10T15:26:50.174)
> >>>> 9> +I(2944,2944,8589c30a128c7eb,45,3,2020-10-10T15:26:51.173)
> >>>> 1> +I(2936,2936,d536e4d1703d20c,98,3,2020-10-10T15:26:51.173)
> >>>> 5> +I(2940,2940,8fd48a674b19424,10,2,2020-10-10T15:26:51.173)
> >>>> 11> +I(2946,2946,862c6321430e164,74,1,2020-10-10T15:26:51.173)
> >>>> 2> +I(2937,2937,d177b845862342f,83,3,2020-10-10T15:26:51.173)
> >>>> 6> +I(2941,2941,7c45b6f1ae69811,86,1,2020-10-10T15:26:51.173)
> >>>> 4> +I(2939,2939,8db073425a9d0a9,81,1,2020-10-10T15:26:51.173)
> >>>> 8> +I(2943,2943,b0d63314e8f7c63,57,1,2020-10-10T15:26:51.173)
> >>>> 10> +I(2945,2945,384375f5f161a93,67,2,2020-10-10T15:26:51.173)
> >>>> 12> +I(2947,2947,7a7ed532a9e5b5a,61,4,2020-10-10T15:26:51.173)
> >>>> 3> +I(2938,2938,10e55f27f137a7f,22,4,2020-10-10T15:26:51.173)
> >>>> 7> +I(2942,2942,52ae903a2633739,91,1,2020-10-10T15:26:51.176)
> >>>> 3> +I(2950,2950,e3e60cd0630aa66,88,4,2020-10-10T15:26:52.174)
> >>>> 1> +I(2948,2948,2cca133cca14cbf,44,1,2020-10-10T15:26:52.174)
> >>>> 2> +I(2949,2949,7cc761cdb8bb924,77,4,2020-10-10T15:26:52.174)
> >>>>
> >>>>
> >>>> Best,
> >>>> TonyChen
> >>>>
> >>>>
> >>
> >>
>
>


Re: Flink 1.11.1 输出Hbase,除rowkey以外全部为null

2020-10-10 文章 Dream-
hi、phoenix要映射现有hbase表,改一下phoenix建标语句应该可以解决

TonyChen  于2020年10月10日周六 下午4:03写道:

> 查了下
>
> hbase(main):011:0> get 'ods_iot_gasdevice', '2900'
> COLUMN  CELL
>  base_info:device_idtimestamp=1602314819543,
> value=\x00\x00\x0BT
>  base_info:verify_code  timestamp=1602314819543,
> value=54976e8caf8f524
>  status_info:battery_power  timestamp=1602314819543,
> value=\x00\x00\x00T
>  status_info:device_status  timestamp=1602314819543,
> value=\x00\x00\x00\x02
>  time_characteristics:create_time   timestamp=1602314819543,
> value=\x00\x00\x01u\x13\x1F\x1Fv
> 1 row(s) in 0.1950 seconds
>
>
> Best,
> TonyChen
>
> > 2020年10月10日 下午3:51,Jark Wu  写道:
> >
> > 我看你是在 phoenix 中查询的,有试过在 hbase shell 中查询吗?
> >
> > On Sat, 10 Oct 2020 at 15:36, TonyChen  wrote:
> >
> >> 补充下:print输出数据正常
> >>
> >> 11> +I(2910,2910,c514eecbc5c4748,9,1,2020-10-10T15:26:48.182)
> >> 12> +I(2911,2911,0d1508ec452d27e,68,4,2020-10-10T15:26:48.182)
> >> 3> +I(2902,2902,54976e60b7e8f54,83,1,2020-10-10T15:26:48.182)
> >> 5> +I(2904,2904,72806e351aa5849,75,1,2020-10-10T15:26:48.182)
> >> 8> +I(2907,2907,1f17c8000abc9e1,38,2,2020-10-10T15:26:48.182)
> >> 1> +I(2900,2900,54976e8caf8f524,84,2,2020-10-10T15:26:48.182)
> >> 6> +I(2905,2905,70852c245fbd949,45,3,2020-10-10T15:26:48.182)
> >> 10> +I(2909,2909,22854c002e08c80,48,2,2020-10-10T15:26:48.182)
> >> 2> +I(2901,2901,4110ae7bfd85814,58,2,2020-10-10T15:26:48.182)
> >> 9> +I(2908,2908,e5074eeedcaea14,46,3,2020-10-10T15:26:48.182)
> >> 4> +I(2903,2903,cc936a01758c0f3,6,4,2020-10-10T15:26:48.182)
> >> 7> +I(2906,2906,30864c59367e180,50,4,2020-10-10T15:26:48.182)
> >> 7> +I(2918,2918,90bbe807fd5,7,4,2020-10-10T15:26:49.178)
> >> 9> +I(2920,2920,7e2f257406c730f,54,4,2020-10-10T15:26:49.178)
> >> 4> +I(2915,2915,b4f2549ea4436b7,60,2,2020-10-10T15:26:49.178)
> >> 11> +I(2922,2922,d7bfa14a1a9555f,6,1,2020-10-10T15:26:49.178)
> >> 6> +I(2917,2917,9515c66bb40527b,78,1,2020-10-10T15:26:49.178)
> >> 12> +I(2923,2923,8f4535a229b3a32,1,1,2020-10-10T15:26:49.178)
> >> 5> +I(2916,2916,093037cdc5acec9,50,4,2020-10-10T15:26:49.178)
> >> 2> +I(2913,2913,2842267351e9487,55,2,2020-10-10T15:26:49.178)
> >> 10> +I(2921,2921,7704923d1f5042b,33,4,2020-10-10T15:26:49.178)
> >> 1> +I(2912,2912,eba073be1b6f4b2,72,2,2020-10-10T15:26:49.178)
> >> 8> +I(2919,2919,dd4ae09e50859b9,3,3,2020-10-10T15:26:49.178)
> >> 3> +I(2914,2914,49655f1ca53e26e,24,2,2020-10-10T15:26:49.178)
> >> 11> +I(2934,2934,3a00563a0ecaf6b,57,2,2020-10-10T15:26:50.173)
> >> 9> +I(2932,2932,9a67d05622a57ae,54,1,2020-10-10T15:26:50.173)
> >> 4> +I(2927,2927,c6db9148eeddd51,26,1,2020-10-10T15:26:50.173)
> >> 5> +I(2928,2928,ae06b971bdeaff4,35,1,2020-10-10T15:26:50.173)
> >> 1> +I(2924,2924,176b97cbb37b8e6,62,1,2020-10-10T15:26:50.173)
> >> 6> +I(2929,2929,115d2d3f33188a6,62,2,2020-10-10T15:26:50.173)
> >> 2> +I(2925,2925,3f3db2f63e505c7,86,4,2020-10-10T15:26:50.173)
> >> 3> +I(2926,2926,d88f38f093f274d,55,1,2020-10-10T15:26:50.173)
> >> 12> +I(2935,2935,56d5fbf93fd515d,72,1,2020-10-10T15:26:50.173)
> >> 8> +I(2931,2931,dd98a2999028a83,59,1,2020-10-10T15:26:50.173)
> >> 10> +I(2933,2933,377ec5fa4e52ca3,28,1,2020-10-10T15:26:50.173)
> >> 7> +I(2930,2930,d2e93ff01ba5d2b,18,1,2020-10-10T15:26:50.174)
> >> 9> +I(2944,2944,8589c30a128c7eb,45,3,2020-10-10T15:26:51.173)
> >> 1> +I(2936,2936,d536e4d1703d20c,98,3,2020-10-10T15:26:51.173)
> >> 5> +I(2940,2940,8fd48a674b19424,10,2,2020-10-10T15:26:51.173)
> >> 11> +I(2946,2946,862c6321430e164,74,1,2020-10-10T15:26:51.173)
> >> 2> +I(2937,2937,d177b845862342f,83,3,2020-10-10T15:26:51.173)
> >> 6> +I(2941,2941,7c45b6f1ae69811,86,1,2020-10-10T15:26:51.173)
> >> 4> +I(2939,2939,8db073425a9d0a9,81,1,2020-10-10T15:26:51.173)
> >> 8> +I(2943,2943,b0d63314e8f7c63,57,1,2020-10-10T15:26:51.173)
> >> 10> +I(2945,2945,384375f5f161a93,67,2,2020-10-10T15:26:51.173)
> >> 12> +I(2947,2947,7a7ed532a9e5b5a,61,4,2020-10-10T15:26:51.173)
> >> 3> +I(2938,2938,10e55f27f137a7f,22,4,2020-10-10T15:26:51.173)
> >> 7> +I(2942,2942,52ae903a2633739,91,1,2020-10-10T15:26:51.176)
> >> 3> +I(2950,2950,e3e60cd0630aa66,88,4,2020-10-10T15:26:52.174)
> >> 1> +I(2948,2948,2cca133cca14cbf,44,1,2020-10-10T15:26:52.174)
> >> 2> +I(2949,2949,7cc761cdb8bb924,77,4,2020-10-10T15:26:52.174)
> >>
> >>
> >> Best,
> >> TonyChen
> >>
> >>
>
>


Re: 怎么样在Flink中使用java代码提交job到yarn

2020-09-29 文章 Dream-
可以直接用yarnclient直接提交,flinkonyarn也是yarnclient提交的吧,不过感觉自己实现一遍挺麻烦的,我们最后也选的是process的方式

xiao cai  于2020年9月29日周二 下午5:54写道:

> 这个我们有尝试,遇到了classpath的问题,导致包冲突,无法启动进程,你们有遇到过相关的情况吗?
>
>
>  原始邮件
> 发件人: todd
> 收件人: user-zh
> 发送时间: 2020年9月29日(周二) 17:36
> 主题: Re: 怎么样在Flink中使用java代码提交job到yarn
>
>
> https://github.com/todd5167/flink-spark-submiter
> 可以参考这个案例,用ClusterCLient提交。 -- Sent from:
> http://apache-flink.147419.n8.nabble.com/


Re: flink多流关联

2020-09-27 文章 Dream-
hi、
我想问一下就是多个流关联输出的时间点是所有维度全部关联上了才输出吗,比如abcd四个流,abc关联上了但这时d的数据还没有到,这个是不触发输出的吧

Michael Ran  于2020年9月27日周日 下午2:38写道:

> 会有相互等待,或者先后不一致的情况,要业务自己衡量等待时间
> 在 2020-09-27 12:09:23,"Dream-底限"  写道:
> >hi
>
> >我们这面想用flink在实时数仓上面做些事情,进行调研后发现数仓可能有多张表进行join、现flink有两种实现方案:第一种是流表lookup时态表,但时态表更新可能会延迟导致查询结果不准确,并且io可能过大;第二种是双流关联,但是如果说有五张表进行join的话,除了状态太大还有其他问题吗,或者说有多流相互等待的问题吗
>


flink多流关联

2020-09-26 文章 Dream-
hi
我们这面想用flink在实时数仓上面做些事情,进行调研后发现数仓可能有多张表进行join、现flink有两种实现方案:第一种是流表lookup时态表,但时态表更新可能会延迟导致查询结果不准确,并且io可能过大;第二种是双流关联,但是如果说有五张表进行join的话,除了状态太大还有其他问题吗,或者说有多流相互等待的问题吗


flink on yarn容器异常退出

2020-09-22 文章 Dream-
hi
我正在使用flink1.11.1 on
yarn以分离模式运行任务,但在任务提交的时候,任务在分配完applicationid后,容器经常异常退出,先前以为是yarn环境问题,但是在两个集群测都有遇到这种情况,请问这是一个已知的bug吗


flink检查点

2020-09-15 文章 Dream-
hi、
我正在做基于检查点的任务自动恢复,请问有没有什么方法来检查检查点是否是一个完整有效的检查点,因为有的时候检查点会失败,所以有没有api层面的校验方式


Re: flink json ddl解析

2020-09-03 文章 Dream-
hi
我现在使用的方式就是对于类型不一致的数组元素全部使用ARRAY来解析,感觉这是一个可以优化的点,想找一个更好的方式

Benchao Li  于2020年9月3日周四 下午12:57写道:

> Hi,
> 如果声明为 ARRAY 是否可以满足你的需求呢?如果可以的话,你可以在
> 1.12之后使用这个feature[1].
>
> [1] https://issues.apache.org/jira/browse/FLINK-18002
>
> zilong xiao  于2020年9月1日周二 下午5:49写道:
>
> > 问题大概懂了,坐等Flink大佬回复
> >
> > Dream-底限  于2020年9月1日周二 下午4:43写道:
> >
> > > hi
> > > 就是json数组如果是这种:[1,2,3],我可以直接array解析
> > >
> > >
> >
> 如果json数组是这种:[1,"test",true],如果我用array>程序是没办法运行的,如果我用array > > int,b string,c boolean>>,flink做ddl翻译解析json的时候会把row > > boolean>这一部分映射为解析jsonobject,但是array元素不是jsonobject会导致取不到数据
> > >
> > > zilong xiao  于2020年9月1日周二 下午4:04写道:
> > >
> > > > 基本类型包装一层会导致解析不出来  这个没太明白,可以举个列子吗?
> > > >
> > > > Dream-底限  于2020年9月1日周二 下午2:20写道:
> > > >
> > > > > hi、
> > > > >
> > > >
> > >
> >
> 我先前也想这样用,但后来发现ddl中的row对应json中的object,基本类型包装一层会导致解析不出来,感觉应该在ddl加一个类型映射一下这种情况
> > > > >
> > > > > zilong xiao  于2020年9月1日周二 上午11:47写道:
> > > > >
> > > > > > like this:  ARRAY > > > > String>>>
> > > > > >
> > > > > > Dream-底限  于2020年9月1日周二 上午11:40写道:
> > > > > >
> > > > > > > hi
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> 我正在解析json数组,在解析的时候遇到一个问题,当解析的json数组元素为同一类型的时候,我可以使用ddl的array进行存储,但是当json数组元素为不同类型的时候,我没办法做ddl映射,我查看JsonRowSchemaConverter解析json
> > > > > > >
> > array的时候,对于不同类型的数组元素解析后可以用row存储,但请问我在ddl时候要怎么做,因为在DDL用row表示数组会抛出异常
> > > > > > >
> > > > > > >
> > > > > > > private static TypeInformation convertArray(String location,
> > > > > > > JsonNode node, JsonNode root) {
> > > > > > >// validate items
> > > > > > >if (!node.has(ITEMS)) {
> > > > > > >   throw new IllegalArgumentException(
> > > > > > >  "Arrays must specify an '" + ITEMS + "' property in
> > node:
> > > "
> > > > +
> > > > > > > location);
> > > > > > >}
> > > > > > >final JsonNode items = node.get(ITEMS);
> > > > > > >
> > > > > > >// list (translated to object array)
> > > > > > >if (items.isObject()) {
> > > > > > >   final TypeInformation elementType = convertType(
> > > > > > >  location + '/' + ITEMS,
> > > > > > >  items,
> > > > > > >  root);
> > > > > > >   // result type might either be ObjectArrayTypeInfo or
> > > > > > > BasicArrayTypeInfo for Strings
> > > > > > >   return Types.OBJECT_ARRAY(elementType);
> > > > > > >}
> > > > > > >// tuple (translated to row)
> > > > > > >else if (items.isArray()) {
> > > > > > >   final TypeInformation[] types = convertTypes(location
> +
> > > '/'
> > > > +
> > > > > > > ITEMS, items, root);
> > > > > > >
> > > > > > >   // validate that array does not contain additional items
> > > > > > >   if (node.has(ADDITIONAL_ITEMS) &&
> > > > > > > node.get(ADDITIONAL_ITEMS).isBoolean() &&
> > > > > > > node.get(ADDITIONAL_ITEMS).asBoolean()) {
> > > > > > >  throw new IllegalArgumentException(
> > > > > > > "An array tuple must not allow additional items in
> > > node:
> > > > "
> > > > > > > + location);
> > > > > > >   }
> > > > > > >
> > > > > > >   return Types.ROW(types);
> > > > > > >}
> > > > > > >throw new IllegalArgumentException(
> > > > > > >   "Invalid type for '" + ITEMS + "' property in node: " +
> > > > > location);
> > > > > > > }
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>
>
> --
>
> Best,
> Benchao Li
>


Re: flink json ddl解析

2020-09-01 文章 Dream-
hi
就是json数组如果是这种:[1,2,3],我可以直接array解析
如果json数组是这种:[1,"test",true],如果我用array>程序是没办法运行的,如果我用array>,flink做ddl翻译解析json的时候会把row这一部分映射为解析jsonobject,但是array元素不是jsonobject会导致取不到数据

zilong xiao  于2020年9月1日周二 下午4:04写道:

> 基本类型包装一层会导致解析不出来  这个没太明白,可以举个列子吗?
>
> Dream-底限  于2020年9月1日周二 下午2:20写道:
>
> > hi、
> >
> 我先前也想这样用,但后来发现ddl中的row对应json中的object,基本类型包装一层会导致解析不出来,感觉应该在ddl加一个类型映射一下这种情况
> >
> > zilong xiao  于2020年9月1日周二 上午11:47写道:
> >
> > > like this:  ARRAY > String>>>
> > >
> > > Dream-底限  于2020年9月1日周二 上午11:40写道:
> > >
> > > > hi
> > > >
> > > >
> > >
> >
> 我正在解析json数组,在解析的时候遇到一个问题,当解析的json数组元素为同一类型的时候,我可以使用ddl的array进行存储,但是当json数组元素为不同类型的时候,我没办法做ddl映射,我查看JsonRowSchemaConverter解析json
> > > > array的时候,对于不同类型的数组元素解析后可以用row存储,但请问我在ddl时候要怎么做,因为在DDL用row表示数组会抛出异常
> > > >
> > > >
> > > > private static TypeInformation convertArray(String location,
> > > > JsonNode node, JsonNode root) {
> > > >// validate items
> > > >if (!node.has(ITEMS)) {
> > > >   throw new IllegalArgumentException(
> > > >  "Arrays must specify an '" + ITEMS + "' property in node: "
> +
> > > > location);
> > > >}
> > > >final JsonNode items = node.get(ITEMS);
> > > >
> > > >// list (translated to object array)
> > > >if (items.isObject()) {
> > > >   final TypeInformation elementType = convertType(
> > > >  location + '/' + ITEMS,
> > > >  items,
> > > >  root);
> > > >   // result type might either be ObjectArrayTypeInfo or
> > > > BasicArrayTypeInfo for Strings
> > > >   return Types.OBJECT_ARRAY(elementType);
> > > >}
> > > >// tuple (translated to row)
> > > >else if (items.isArray()) {
> > > >   final TypeInformation[] types = convertTypes(location + '/'
> +
> > > > ITEMS, items, root);
> > > >
> > > >   // validate that array does not contain additional items
> > > >   if (node.has(ADDITIONAL_ITEMS) &&
> > > > node.get(ADDITIONAL_ITEMS).isBoolean() &&
> > > > node.get(ADDITIONAL_ITEMS).asBoolean()) {
> > > >  throw new IllegalArgumentException(
> > > > "An array tuple must not allow additional items in node:
> "
> > > > + location);
> > > >   }
> > > >
> > > >   return Types.ROW(types);
> > > >}
> > > >throw new IllegalArgumentException(
> > > >   "Invalid type for '" + ITEMS + "' property in node: " +
> > location);
> > > > }
> > > >
> > >
> >
>


Re: flink json ddl解析

2020-09-01 文章 Dream-
hi、
我先前也想这样用,但后来发现ddl中的row对应json中的object,基本类型包装一层会导致解析不出来,感觉应该在ddl加一个类型映射一下这种情况

zilong xiao  于2020年9月1日周二 上午11:47写道:

> like this:  ARRAY>>
>
> Dream-底限  于2020年9月1日周二 上午11:40写道:
>
> > hi
> >
> >
> 我正在解析json数组,在解析的时候遇到一个问题,当解析的json数组元素为同一类型的时候,我可以使用ddl的array进行存储,但是当json数组元素为不同类型的时候,我没办法做ddl映射,我查看JsonRowSchemaConverter解析json
> > array的时候,对于不同类型的数组元素解析后可以用row存储,但请问我在ddl时候要怎么做,因为在DDL用row表示数组会抛出异常
> >
> >
> > private static TypeInformation convertArray(String location,
> > JsonNode node, JsonNode root) {
> >// validate items
> >if (!node.has(ITEMS)) {
> >   throw new IllegalArgumentException(
> >  "Arrays must specify an '" + ITEMS + "' property in node: " +
> > location);
> >}
> >final JsonNode items = node.get(ITEMS);
> >
> >// list (translated to object array)
> >if (items.isObject()) {
> >   final TypeInformation elementType = convertType(
> >  location + '/' + ITEMS,
> >  items,
> >  root);
> >   // result type might either be ObjectArrayTypeInfo or
> > BasicArrayTypeInfo for Strings
> >   return Types.OBJECT_ARRAY(elementType);
> >}
> >// tuple (translated to row)
> >else if (items.isArray()) {
> >   final TypeInformation[] types = convertTypes(location + '/' +
> > ITEMS, items, root);
> >
> >   // validate that array does not contain additional items
> >   if (node.has(ADDITIONAL_ITEMS) &&
> > node.get(ADDITIONAL_ITEMS).isBoolean() &&
> > node.get(ADDITIONAL_ITEMS).asBoolean()) {
> >  throw new IllegalArgumentException(
> > "An array tuple must not allow additional items in node: "
> > + location);
> >   }
> >
> >   return Types.ROW(types);
> >}
> >throw new IllegalArgumentException(
> >   "Invalid type for '" + ITEMS + "' property in node: " + location);
> > }
> >
>


flink json ddl解析

2020-08-31 文章 Dream-
hi
我正在解析json数组,在解析的时候遇到一个问题,当解析的json数组元素为同一类型的时候,我可以使用ddl的array进行存储,但是当json数组元素为不同类型的时候,我没办法做ddl映射,我查看JsonRowSchemaConverter解析json
array的时候,对于不同类型的数组元素解析后可以用row存储,但请问我在ddl时候要怎么做,因为在DDL用row表示数组会抛出异常


private static TypeInformation convertArray(String location,
JsonNode node, JsonNode root) {
   // validate items
   if (!node.has(ITEMS)) {
  throw new IllegalArgumentException(
 "Arrays must specify an '" + ITEMS + "' property in node: " +
location);
   }
   final JsonNode items = node.get(ITEMS);

   // list (translated to object array)
   if (items.isObject()) {
  final TypeInformation elementType = convertType(
 location + '/' + ITEMS,
 items,
 root);
  // result type might either be ObjectArrayTypeInfo or
BasicArrayTypeInfo for Strings
  return Types.OBJECT_ARRAY(elementType);
   }
   // tuple (translated to row)
   else if (items.isArray()) {
  final TypeInformation[] types = convertTypes(location + '/' +
ITEMS, items, root);

  // validate that array does not contain additional items
  if (node.has(ADDITIONAL_ITEMS) &&
node.get(ADDITIONAL_ITEMS).isBoolean() &&
node.get(ADDITIONAL_ITEMS).asBoolean()) {
 throw new IllegalArgumentException(
"An array tuple must not allow additional items in node: "
+ location);
  }

  return Types.ROW(types);
   }
   throw new IllegalArgumentException(
  "Invalid type for '" + ITEMS + "' property in node: " + location);
}


Re: flink1.11时间函数

2020-08-29 文章 Dream-
哦哦,好吧,我昨天用NOW的时候直接报错告诉我这是个bug,让我提交issue,我以为这种标示的都是函数功能有问题的

Benchao Li  于2020年8月28日周五 下午8:01写道:

> 不确定的意思是,这个函数的返回值是动态的,每次调用返回可能不同。
> 对应的是确定性函数,比如concat就是确定性函数,只要输入是一样的,它的返回值就永远都是一样的。
> 这个函数是否是确定性的,会影响plan的过程,比如是否可以做express reduce,是否可以复用表达式结果等。
>
> Dream-底限  于2020年8月28日周五 下午2:50写道:
>
> > hi
> >
> > UNIX_TIMESTAMP()
> >
> > NOW()
> >
> > 我这面想使用flink的时间戳函数,但是看官方文档对这两个函数描述后面加了一个此功能不确定,这个此功能不确定指的是这两个时间函数不能用吗
> >
>
>
> --
>
> Best,
> Benchao Li
>


flink如何将时间戳转换为长整型并精确到毫秒

2020-08-28 文章 Dream-
hi、
我这面使用flink时间函数转换为长整型的时候只能做到秒,除了自定义udf有办法做到转换到毫秒精度吗


flink文档

2020-08-28 文章 Dream-
hi、
哪位大佬可以把flink官方文档中的函数部分完善一下啊,函数下面配个应用方式可好,看文档我都不知道下面函数是怎么用的,有的可以直接用有的sql解析不通过,还得一个一个测。。。
Temporal functionsDescription

DATE string

Returns a SQL date parsed from *string* in form of "-MM-dd".

TIME string

Returns a SQL time parsed from *string* in form of "HH:mm:ss".

TIMESTAMP string

Returns a SQL timestamp parsed from *string* in form of "-MM-dd
HH:mm:ss[.SSS]".

INTERVAL string range

Parses an interval *string* in the form "dd hh:mm:ss.fff" for SQL intervals
of milliseconds or "-mm" for SQL intervals of months. An interval range
might be DAY, MINUTE, DAY TO HOUR, or DAY TO SECOND for intervals of
milliseconds; YEAR or YEAR TO MONTH for intervals of months.

E.g., INTERVAL '10 00:00:00.004' DAY TO SECOND, INTERVAL '10' DAY, or INTERVAL
'2-10' YEAR TO MONTH return intervals.

CURRENT_DATE

Returns the current SQL date in the UTC time zone.

CURRENT_TIME

Returns the current SQL time in the UTC time zone.

CURRENT_TIMESTAMP

Returns the current SQL timestamp in the UTC time zone.

LOCALTIME

Returns the current SQL time in local time zone.

LOCALTIMESTAMP

Returns the current SQL timestamp in local time zone.


flink1.11时间函数

2020-08-28 文章 Dream-
hi

UNIX_TIMESTAMP()

NOW()

我这面想使用flink的时间戳函数,但是看官方文档对这两个函数描述后面加了一个此功能不确定,这个此功能不确定指的是这两个时间函数不能用吗


Re: [需求调研] Stream SQL window join 支持

2020-08-27 文章 Dream-
我先前遇到的是,两个流在同一个窗口上求某个字段的count比例,比如求上一个小时购买对访问求比

Danny Chan  于2020年8月27日周四 下午2:48写道:

> 大家好 ~
>
> 这里做一个 window-join[1] 的需求调研, window-join 是 Flink DataStream 上已经有的 feature.
> 目标是决策是否要在 SQL 上支持该特性, 例如, tumbling window join 语法可能长这样:
>
> ```sql
> select ... window_start, window_end
> from TABLE(
>   TUMBLE(
> DATA => TABLE table_a,
> TIMECOL => DESCRIPTOR(rowtime),
> SIZE => INTERVAL '1' MINUTE)) tumble_a
>   [LEFT | RIGHT | FULL OUTER] JOIN TABLE(
>   TUMBLE(
> DATA => TABLE table_b,
> TIMECOL => DESCRIPTOR(rowtime),
> SIZE => INTERVAL '1' MINUTE)) tumble_b
> on tumble_a.col1 = tumble_b.col1 and ...
> ```
>
> 目前了解到的情况是一些公司 interval join 用的比较多,window join 的 case 还是比较少的, 希望这里可以有更多的反馈。
>
> 希望您可以分享 window join 的一些使用案例,以及为什么选用 window-join (而不是 interval join)。
>
> 感谢 ~
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/joining.html
>
> Best,
> Danny Chan
>


flink1.11 kafka sql connector

2020-08-25 文章 Dream-
hi
我正在使用DDL语句创建kafka数据源,但是查看文档时候发现没有报漏参数指定消费者组的offset是否提交,请问这个默认情况下offset会不会提交到kafka分区


Re: flink1.11 cdc使用

2020-08-24 文章 Dream-
好的,感谢

china_tao  于2020年8月24日周一 下午12:21写道:

> 支持。
> insert into mysqlresult select k.vin,k.msgtime,d.brand_name from (SELECT
> vin,max(msgtime) as msgtime,max(pts) as pts from kafkaSourceTable  group by
> TUMBLE(rowtime, INTERVAL '10' SECOND),vin) AS k left join msyqlDimTable
> FOR
> SYSTEM_TIME AS OF k.pts AS d ON k.vin = d.vin
>
> 类似这样,先开10秒窗口获得kafka数据,然后join msyql维度表,然后插入mysql。
>
> 关键就是注意维度表lookup_cache_max-rows,lookup_cache_ttl这两个参数,设置维度表的更新时间。具体项目,具体对待,关键就是看看需要维度表支持多长时间的更新延迟。
> 另外,join维度表,目前应该只支持pts,不支持rowtime。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


flink1.11 cdc使用

2020-08-21 文章 Dream-
hi
我这面想使用flinkcdc做实时etl,我看可以做到维表(时态表)关联,现在想问一下能在cdc功能中用聚合算子嘛,全局groupby或窗口函数


Re: flink任务提交

2020-08-19 文章 Dream-
是的给傻瓜用户用的,开发人员大多不喜欢用

赵一旦  于2020年8月19日周三 下午5:34写道:

> 做个任务管理界面,然后点点点,到后台,然后后台通过java程序提交任务,这样吗?高端。
>
> Dream-底限  于2020年8月19日周三 下午5:22写道:
>
> > 想springboot服务化对外报漏接口请求
> >
> > 赵一旦  于2020年8月19日周三 下午5:18写道:
> >
> > > 直接写个脚本提交不可以吗,为什么一定要通过java提交呢?
> > >
> > > Dream-底限  于2020年8月19日周三 下午4:41写道:
> > >
> > > > 好的,感谢
> > > >
> > > > Jeff Zhang  于2020年8月19日周三 下午4:31写道:
> > > >
> > > > > Zeppelin 最近在做这样的API来提交Flink Job,这里有个例子可以参考下
> > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/zjffdu/zeppelin/blob/ZEPPELIN-4981/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java#L307
> > > > >
> > > > > 可以加入钉钉群讨论,钉钉群号:32803524
> > > > >
> > > > >
> > > > >
> > > > > Dream-底限  于2020年8月19日周三 下午4:27写道:
> > > > >
> > > > > > hi、
> > > > > > 请问你说的是自己拼接cli字符串,然后通过java调用子进程执行的这种方式吗
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> 我先前也是这么做的,但感觉比较怪异,这种方式有一个问题是貌似没办法直接返回applicationId,要从日志里面筛选好像,再就是没办法判断提交是否成功,貌似也是从日志来做,请问这个applicationId在提交的时候除了从日志筛选有其他的获取方式吗
> > > > > >
> > > > > > wxpcc  于2020年8月19日周三 下午4:09写道:
> > > > > >
> > > > > > > 大概可以用,YarnClusterDescriptor
> > > > > > >
> > > > > > > 命令行方式封装提交对于后续升级更加方便一些,个人建议
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > Sent from: http://apache-flink.147419.n8.nabble.com/
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > Best Regards
> > > > >
> > > > > Jeff Zhang
> > > > >
> > > >
> > >
> >
>


Re: flinkSQL eventtime问题

2020-08-19 文章 Dream-
hi
可以在转换为table之前把map的eventtime字段提取出来注册

★猛★  于2020年8月19日周三 下午5:23写道:

> hi,
>
>
>  咨询一下,我有个table里边有个map字段,我想用map里的某个字段作为eventtime,有没有办法实现?


Re: flink任务提交

2020-08-19 文章 Dream-
想springboot服务化对外报漏接口请求

赵一旦  于2020年8月19日周三 下午5:18写道:

> 直接写个脚本提交不可以吗,为什么一定要通过java提交呢?
>
> Dream-底限  于2020年8月19日周三 下午4:41写道:
>
> > 好的,感谢
> >
> > Jeff Zhang  于2020年8月19日周三 下午4:31写道:
> >
> > > Zeppelin 最近在做这样的API来提交Flink Job,这里有个例子可以参考下
> > >
> > >
> >
> https://github.com/zjffdu/zeppelin/blob/ZEPPELIN-4981/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java#L307
> > >
> > > 可以加入钉钉群讨论,钉钉群号:32803524
> > >
> > >
> > >
> > > Dream-底限  于2020年8月19日周三 下午4:27写道:
> > >
> > > > hi、
> > > > 请问你说的是自己拼接cli字符串,然后通过java调用子进程执行的这种方式吗
> > > >
> > > >
> > >
> >
> 我先前也是这么做的,但感觉比较怪异,这种方式有一个问题是貌似没办法直接返回applicationId,要从日志里面筛选好像,再就是没办法判断提交是否成功,貌似也是从日志来做,请问这个applicationId在提交的时候除了从日志筛选有其他的获取方式吗
> > > >
> > > > wxpcc  于2020年8月19日周三 下午4:09写道:
> > > >
> > > > > 大概可以用,YarnClusterDescriptor
> > > > >
> > > > > 命令行方式封装提交对于后续升级更加方便一些,个人建议
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > Sent from: http://apache-flink.147419.n8.nabble.com/
> > > >
> > >
> > >
> > > --
> > > Best Regards
> > >
> > > Jeff Zhang
> > >
> >
>


Re: flink任务提交

2020-08-19 文章 Dream-
好的,感谢

Jeff Zhang  于2020年8月19日周三 下午4:31写道:

> Zeppelin 最近在做这样的API来提交Flink Job,这里有个例子可以参考下
>
> https://github.com/zjffdu/zeppelin/blob/ZEPPELIN-4981/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java#L307
>
> 可以加入钉钉群讨论,钉钉群号:32803524
>
>
>
> Dream-底限  于2020年8月19日周三 下午4:27写道:
>
> > hi、
> > 请问你说的是自己拼接cli字符串,然后通过java调用子进程执行的这种方式吗
> >
> >
> 我先前也是这么做的,但感觉比较怪异,这种方式有一个问题是貌似没办法直接返回applicationId,要从日志里面筛选好像,再就是没办法判断提交是否成功,貌似也是从日志来做,请问这个applicationId在提交的时候除了从日志筛选有其他的获取方式吗
> >
> > wxpcc  于2020年8月19日周三 下午4:09写道:
> >
> > > 大概可以用,YarnClusterDescriptor
> > >
> > > 命令行方式封装提交对于后续升级更加方便一些,个人建议
> > >
> > >
> > >
> > > --
> > > Sent from: http://apache-flink.147419.n8.nabble.com/
> >
>
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: flink任务提交

2020-08-19 文章 Dream-
hi、
请问你说的是自己拼接cli字符串,然后通过java调用子进程执行的这种方式吗
我先前也是这么做的,但感觉比较怪异,这种方式有一个问题是貌似没办法直接返回applicationId,要从日志里面筛选好像,再就是没办法判断提交是否成功,貌似也是从日志来做,请问这个applicationId在提交的时候除了从日志筛选有其他的获取方式吗

wxpcc  于2020年8月19日周三 下午4:09写道:

> 大概可以用,YarnClusterDescriptor
>
> 命令行方式封装提交对于后续升级更加方便一些,个人建议
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink1.11 hive catalog使用

2020-08-19 文章 Dream-
hi
先设置hiveCatalog及默认数据库,然后use hivecatalog
接下来建表的时候create temporary table test(...),使用的时候:select ... from test
这个时候就报错说hivecatalog的默认数据库内无这张表

Rui Li  于2020年8月19日周三 下午3:49写道:

> 是怎么用的啊,我去试试
>
> On Wed, Aug 19, 2020 at 11:19 AM Dream-底限  wrote:
>
> > hi
> > 我先前用这种方式创建的表,在sql中直接使用的时候提示hivecatalog内无这张表,请问这张表使用的时候要怎么引用哪
> > >或者用create temporary table的方式应该也可以。
> >
> > Rui Li  于2020年8月19日周三 上午11:11写道:
> >
> > > 可以把kafka的表保存在内置的in-memory
> > catalog里,名字应该是default_catalog,这样就不会持久化了。或者用create
> > > temporary table的方式应该也可以。
> > >
> > > On Wed, Aug 19, 2020 at 10:53 AM Dream-底限  wrote:
> > >
> > > > hi
> > > > 我这面在使用hivecatalog将kafka数据落地到hive,但现在有一个问题是,我不想保存kafka source
> > > > table元数据(默认自动保存),通过创建视图或临时表的方式启动flink任务,这时候直接通过kafka source
> > > > table表名直接引用报错,提示hive catalog内没这张表,这种情况我改怎么引用我的kafka未持久化表哪
> > > >
> > >
> > >
> > > --
> > > Best regards!
> > > Rui Li
> > >
> >
>
>
> --
> Best regards!
> Rui Li
>


flink任务提交

2020-08-19 文章 Dream-
hi、
我这面想通过java程序使用perJob模式提交一个flink
jar任务到yarn集群,我看了一下flink-client和flink-yarn包,感觉是可以做到的,但又有点儿无从入手的感觉,请问flink可以做到吗,具体调用哪个类哪


Re: flink1.12啥时候会出呢?

2020-08-18 文章 Dream-
个人估计恐怕得年底,貌似九月冻结吧,然后测试修复什么的就差不多阳历年了。。。

赵一旦  于2020年8月19日周三 上午11:16写道:

> 如题,有大概时间嘛。
>


Re: Flink1.11 hive catalog使用

2020-08-18 文章 Dream-
hi
我先前用这种方式创建的表,在sql中直接使用的时候提示hivecatalog内无这张表,请问这张表使用的时候要怎么引用哪
>或者用create temporary table的方式应该也可以。

Rui Li  于2020年8月19日周三 上午11:11写道:

> 可以把kafka的表保存在内置的in-memory catalog里,名字应该是default_catalog,这样就不会持久化了。或者用create
> temporary table的方式应该也可以。
>
> On Wed, Aug 19, 2020 at 10:53 AM Dream-底限  wrote:
>
> > hi
> > 我这面在使用hivecatalog将kafka数据落地到hive,但现在有一个问题是,我不想保存kafka source
> > table元数据(默认自动保存),通过创建视图或临时表的方式启动flink任务,这时候直接通过kafka source
> > table表名直接引用报错,提示hive catalog内没这张表,这种情况我改怎么引用我的kafka未持久化表哪
> >
>
>
> --
> Best regards!
> Rui Li
>


Flink1.11 hive catalog使用

2020-08-18 文章 Dream-
hi
我这面在使用hivecatalog将kafka数据落地到hive,但现在有一个问题是,我不想保存kafka source
table元数据(默认自动保存),通过创建视图或临时表的方式启动flink任务,这时候直接通过kafka source
table表名直接引用报错,提示hive catalog内没这张表,这种情况我改怎么引用我的kafka未持久化表哪


flink1.11任务资源分批

2020-08-16 文章 Dream-
hi、
请问如果想要flink任务自动扩缩容有什么办法吗,反压的时候自动加资源,然后在自动缩。


Re: Re: 对于维表频繁更新,状态越来越大的场景如何保证数据的准确性

2020-08-12 文章 Dream-
flink暴漏的lookup
是支持设置缓存记录条数和缓存时间的吧,把时间和条数设置的小一点或者直接禁用缓存,如果流表数据量不大的话可以不用异步访问,数据量大的话异步访问不加缓存维表存储引擎可能压力过大

Jim Chen  于2020年8月13日周四 上午11:53写道:

> 请问下,如果使用了localcache+asyncIO的方式,缓存一致性,有什么解决的思路吗?维表的状态是频繁更新的
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink1.11 es connector

2020-08-11 文章 Dream-
hi
主要做实时特征,历史数据会做一次批量load,然后flink跑实时特征的计算,涉及到一些维度打算用双流关联+流表时态表关联,下游存储hbase/es/jdbc,先前想都用es,不过感觉es不是干这个的。。

jacky-cui <826885...@qq.com> 于2020年8月11日周二 下午2:33写道:

> Hi, 我这边现在也用es做报表分析,但是对于你提到的场景我有个点不是很明白。
> 你们的flink 在其中充当的是什么角色呢,是清洗出维度结果,然后写到es吗?还是?
> 我是直接做成大宽表,然后用DSL语句去查询这个大宽表,能cover大部分报表需求
>
>
> 祝好
> 崔黄飞
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> imj...@gmail.com;
> 发送时间:2020年8月10日(星期一) 中午12:36
> 收件人:"user-zh"
> 主题:Re: flink1.11 es connector
>
>
>
> 对 ES 研究不是很深入。 个人觉得是一个实用的场景。
>
> On Fri, 7 Aug 2020 at 09:50, Dream-底限 
>  hi、
> 
> 是的,大佬感觉用es做维表靠谱吗,这样就不用维护hbase二级索引了(或者用es存hbase二级索引,hbase存数据,但还是要用es充当一个维度)
> 
>  Jark Wu  
>   目前社区由一个 issue 在跟进 es source ,可以关注一下:
>   https://issues.apache.org/jira/browse/FLINK-16713
>   你想要的时态表查询,是想当成维表查询吗(lookup)?
>  
>   Best,
>   Jark
>  
>   On Thu, 6 Aug 2020 at 11:20, Dream-底限  wrote:
>  
>hi
>   
>   
>  
> 
> 我们这面想用es做时态表查询,但是flink没有报漏es源连接器,需要自己实现一个,请问大家对es做时态表这件事感觉靠谱吗(ps:之所以不用hbase是因为hbase的rowkey设计以及可能维护的二级索引比较麻烦,但hbase也是一个调研方案)
>   
>  
> 


flink集群搭建

2020-08-10 文章 Dream-
hi、
FlinkOnYarn集群部署是推荐使用yarn-session模式所有任务共用一个,还是推荐使用preJob模式每个任务起一个小集群


Re: flink1.11 es connector

2020-08-06 文章 Dream-
hi、
是的,大佬感觉用es做维表靠谱吗,这样就不用维护hbase二级索引了(或者用es存hbase二级索引,hbase存数据,但还是要用es充当一个维度)

Jark Wu  于2020年8月7日周五 上午12:34写道:

> 目前社区由一个 issue 在跟进 es source ,可以关注一下:
> https://issues.apache.org/jira/browse/FLINK-16713
> 你想要的时态表查询,是想当成维表查询吗(lookup)?
>
> Best,
> Jark
>
> On Thu, 6 Aug 2020 at 11:20, Dream-底限  wrote:
>
> > hi
> >
> >
> 我们这面想用es做时态表查询,但是flink没有报漏es源连接器,需要自己实现一个,请问大家对es做时态表这件事感觉靠谱吗(ps:之所以不用hbase是因为hbase的rowkey设计以及可能维护的二级索引比较麻烦,但hbase也是一个调研方案)
> >
>


flink1.11 es connector

2020-08-05 文章 Dream-
hi
我们这面想用es做时态表查询,但是flink没有报漏es源连接器,需要自己实现一个,请问大家对es做时态表这件事感觉靠谱吗(ps:之所以不用hbase是因为hbase的rowkey设计以及可能维护的二级索引比较麻烦,但hbase也是一个调研方案)


Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-07-28 文章 Dream-
hi 鱼子酱、
我当初这样用的时候状态也不清理(子查询+时间窗口+union),后来把时间窗口改成全局group函数,union改成订阅topic列表后,设置状态过期时间状态才清理。。。
后来看资料有的说分区数据不均衡导致水印不推进的话可能导致这种状态不清理的问题,但是我感觉不是水印导致的,水印导致的窗口应该不触发计算吧,感觉这里面有些bug,需要专业人士定位一下

鱼子酱 <384939...@qq.com> 于2020年7月29日周三 上午9:53写道:

> 您好:
>
> 我按照您说的试了看了一下watermark,
> 发现可以 正常更新,相关的计算结果也没发现问题。
> 1. 刚刚截了图在下面,时间因为时区的问题-8就正常了
> 
> 2. checkpoint里面的信息,能看出大小是线性增长的,然后主要集中在2个窗口和group里面。
> 
> 
>
>
>
> Congxian Qiu wrote
> > Hi
> > SQL 部分不太熟,根据以往的经验,对于 event time 情况下 window 的某个算子 state
> 越来越大的情况,或许可以检查下
> > watermark[1]
> >
> > [1]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/monitoring/debugging_event_time.html
> >
> > Best,
> > Congxian
> >
> >
> > 鱼子酱 <
>
> > 384939718@
>
> >> 于2020年7月28日周二 下午2:45写道:
> >
> >> Hi,社区的各位大家好:
> >> 我目前生产上面使用的是1.8.2版本,相对稳定
> >> 为了能够用sql统一所有相关的作业,同时我也一直在跟着flink最新版本进行研究,
> >> 截止目前先后研究了1.10.1 1.11.1共2个大版本
> >>
> >> 在尝试使用的过程中,我发现了通过程序,使用sql进行group操作时,checkpoint中的数据量一直在缓慢增加
> >> 状态后端使用的是rocksdb 的增量模式
> >> StateBackend backend =new
> >> RocksDBStateBackend("hdfs:///checkpoints-data/",true);
> >> 设置了官网文档中找到的删除策略:
> >> TableConfig tableConfig = streamTableEnvironment.getConfig();
> >> tableConfig.setIdleStateRetentionTime(Time.minutes(2),
> >> Time.minutes(7));
> >>
> >> 请问是我使用的方式不对吗?
> >>
> >> 通过WebUI查看详细的checkpoint信息,发现状态大的原因主要集中在group这一Operator
> >>
> >>
> >>
> >> 版本影响:flink1.10.1 flink1.11.1
> >> planner:blink planner
> >> source : kafka source
> >> 时间属性: env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> >>
> >>
> >>
> >>
> >>
> >> sql:
> >> insert into  result
> >> select request_time ,request_id ,request_cnt ,avg_resptime
> >> ,stddev_resptime ,terminal_cnt
> >> ,SUBSTRING(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd HH:mm:ss.SSS'),0,19)
> >> from
> >> (   select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1'
> >> MINUTE),'-MM-dd HH:mm:ss.SSS'),0,21) as request_time
> >> ,commandId as request_id
> >> ,count(*) as request_cnt
> >> ,ROUND(avg(CAST(`respTime` as double)),2) as avg_resptime
> >> ,ROUND(stddev_pop(CAST(`respTime` as double)),2) as
> >> stddev_resptime
> >> from log
> >> where
> >> commandId in (104005 ,204005 ,404005)
> >> and errCode=0 and attr=0
> >> group by TUMBLE(times, INTERVAL '1' MINUTE),commandId
> >>
> >> union all
> >>
> >> select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1'
> >> MINUTE),'-MM-dd HH:mm:ss.SSS'),0,21) as request_time
> >> ,
> >> ,count(*) as request_cnt
> >> ,ROUND(avg(CAST(`respTime` as double)),2) as avg_resptime
> >> ,ROUND(stddev_pop(CAST(`respTime` as double)),2) as
> >> stddev_resptime
> >> from log
> >> where
> >> commandId in (104005 ,204005 ,404005)
> >> and errCode=0 and attr=0
> >> group by TUMBLE(times, INTERVAL '1' MINUTE)
> >> )
> >>
> >>
> >> source:
> >>
> >> create table log (
> >>   eventTime bigint
> >>   ,times timestamp(3)
> >>   
> >>   ,commandId integer
> >>   ,watermark for times as times - interval '5' second
> >> )
> >> with(
> >>  'connector' = 'kafka-0.10',
> >>  'topic' = '……',
> >>  'properties.bootstrap.servers' = '……',
> >>  'properties.group.id' = '……',
> >>  'scan.startup.mode' = 'latest-offset',
> >>  'format' = 'json'
> >> )
> >>
> >> sink1:
> >> create table result (
> >>   request_time varchar
> >>   ,request_id integer
> >>   ,request_cnt bigint
> >>   ,avg_resptime double
> >>   ,stddev_resptime double
> >>   ,insert_time varchar
> >> ) with (
> >>   'connector' = 'kafka-0.10',
> >>   'topic' = '……',
> >>   'properties.bootstrap.servers' = '……',
> >>   'properties.group.id' = '……',
> >>   'scan.startup.mode' = 'latest-offset',
> >>   'format' = 'json'
> >> )
> >>
> >>
> >>
> >>
> >>
> >> --
> >> Sent from: http://apache-flink.147419.n8.nabble.com/
> >>
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: flink解析kafka json数据

2020-07-23 文章 Dream-
hi jark wu、
将解析错误数据直接打到日志里确实是比较通用的解决方案;
我现在使用flink sql对接kafka
json数据的时候,发现对json数据的解析有一些局限性,即比如我有一条数据是jsonobject,但是我在定义flink sql
connector数据类型的时候如果直接定义为string,会导致数据解析失败(当然,这个失败是正常的)
但是这会有一个局限性就是我没办法以一个string方式获取一个jsonobject数据(由于一些比较尴尬的原因就想以string方式获取jsonobject数据),查看代码发现这是jackson导致的获取失败,这个社区考虑兼容一下吗?

if (simpleTypeInfo == Types.STRING) {
   return Optional.of((mapper, jsonNode) -> jsonNode.asText());//
这里会返回空,改成下面样子兼容一下

if (simpleTypeInfo == Types.STRING) {
   return Optional.of((mapper, jsonNode) ->
jsonNode.isTextual()?jsonNode.asText():jsonNode.toString());


Jark Wu  于2020年7月21日周二 下午10:30写道:

> 目前是不支持的。这个需求有点太业务特定了。flink 不可能为了一个错误日志去抽象、对接各种存储系统。
> 一种方案是社区可以考虑支持下打印到日志里,然后用户可以通过自定义插件 log appender 写入外部存储。
>
> Best,
> Jark
>
> On Tue, 21 Jul 2020 at 18:53, Dream-底限  wrote:
>
> > hi
> >  json.ignore-parse-errors那只配置这个就好了, 其实我想把解析失败的数据存储到外部系统,而不是直接丢弃
> >
> > Leonard Xu  于2020年7月21日周二 下午4:18写道:
> >
> > > Hi,
> > > 我理解应该做不到,因为这两个format参数在format里就做的。
> > > json.ignore-parse-errors 是在
> > > format解析时跳过解析失败的数据继续解析下一行,json.fail-on-missing-field
> > > 是标记如果字段少时是否失败还是继续(缺少的字段用null补上)
> > > 这两个不能同时为ture,语义上就是互斥的。
> > >
> > > Best
> > > Leonard Xu
> > > > 在 2020年7月21日,16:08,Dream-底限  写道:
> > > >
> > > >
> > >
> >
> json数据,在创建表的时候发现json数据解析的时候有下面两项,这两项如果开启那么解析失败的数据是会被丢掉吗,有没有方式可以把解析失败的数据打到外部存储
> > >
> > >
> >
>


Re: flink1.11日志上报

2020-07-23 文章 Dream-
hi Cayden chen、
也就是说你们日志上报的实现方式是实现自定义appder来实现是吧,这确实是一个不错的方式;
我先前看spark可以实现对应的listener用来实现日志上报,查看了一下flink
api貌似也有对应listen,具体是实现哪一个还不知道,现在我们还处在一个功能整理阶段


Cayden chen <1193216...@qq.com> 于2020年7月24日周五 上午10:53写道:

> 我们的获取逻辑是通过自定义 logback的appder( flink
> 默认的应该是log4j,对应配置在安装目录的conf下面),appder通过解析当前系统路径(因为flink每个taskmanager会自己定义一个带有applicationId的路径,然后里面会放各种jar包,包括我自定义的appder),获取之后通过MDC.put(),给日志加一列appId,在appder里面把日志上报到外部的日志系统
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> zhan...@akulaku.com;
> 发送时间:2020年7月24日(星期五) 上午10:49
> 收件人:"user-zh"
> 主题:flink1.11日志上报
>
>
>
> hi、
>
> 我这面想实现一个日志上报的功能,就是flink任务启动后,让flink主动将当前任务日志打到外部存储系统,想问一下flink有对应的接口吗,具体要实现哪一个类哪


flink1.11日志上报

2020-07-23 文章 Dream-
hi、
我这面想实现一个日志上报的功能,就是flink任务启动后,让flink主动将当前任务日志打到外部存储系统,想问一下flink有对应的接口吗,具体要实现哪一个类哪


Re: flink row 类型

2020-07-23 文章 Dream-
hi xiao cai
我懂你的意思了,这确实是一种解决方式,不过这种方式有一个弊端就是每个这种功能都要开发对应的方法,我还是比较倾向于一个方法适用于一类场景,如果做不到只能每次有需求都重新开发了

xiao cai  于2020年7月23日周四 下午3:40写道:

> Hi ,Dream
>
>
> 比如你最终拿到的是Row(10),10表示有10个字段,这些字段的顺序是固定的,那么你可以把每个字段在row里的索引的映射关系保存下来,如下
> map ,然后 row.getField(map.get(fieldName))获取你需要的值
>
>
>  原始邮件
> 发件人: Dream-底限
> 收件人: user-zh
> 发送时间: 2020年7月23日(周四) 14:57
> 主题: Re: flink row 类型
>
>
> hi、xiao cai 可以说一下思路吗,我没太懂 》》可以考虑把字段索引值保存下来再获取 Dream-底限 <
> zhan...@akulaku.com> 于2020年7月23日周四 下午2:56写道: > hi、Jingsong Li >
> 我查看了对应的api,并运行了demo测试,通过CallContext我可以拿到对应的字段类型,但是无法拿到对应的字段名称 > >
> >>在TypeInference中有input的type,这个type应该是包含字段信息的。 > > xiao cai <
> flin...@163.com> 于2020年7月23日周四 下午2:19写道: > >> 可以考虑把字段索引值保存下来再获取 >> >> >>
> 原始邮件 >> 发件人: Dream-底限 >> 收件人: user-zh<
> user-zh@flink.apache.org> >> 发送时间: 2020年7月23日(周四) 14:08 >> 主题: Re: flink
> row 类型 >> >> >> hi 是的,我们的数据场景比较尴尬,那我想其他方式实现一下 Benchao Li <
> libenc...@apache.org> >> 于2020年7月23日周四 下午12:55写道: > 这个应该是做不到的。name只是SQL
> plan过程的东西,在运行时它就没有什么实际意义了。 > >>
> 你是想在udf里面获取row里面每个字段的名字是吧?如果是的话,我理解现在应该是做不到的。 > > Dream-底限 < >>
> zhan...@akulaku.com> 于2020年7月22日周三 下午7:22写道: > > > hi、 > > >>
> 我这面定义row数据,类型为ROW,可以通过 > > >>
> row.getField(i)获取到对应的值,但是我想获取对应的row_name名称要怎么操作,貌似没有报漏获取名称的接口 > > > > >>
> rule_key 转换为rule_key1,rulekey2 > > 1 > > 2 > > > > > -- > > Best, > Benchao
> >> Li > > >


Re: flink1.11 tablefunction

2020-07-23 文章 Dream-
hi
这貌似确实是一个bug,先用子查询打开后程序就可以运行正常了

Benchao Li  于2020年7月23日周四 下午12:52写道:

> 现在有一个work around,就是你可以用子查询先把row展开,比如:
> select ...
> from (
>   select data.rule_results as rule_results, ...
> ) cross join unnest(rule_results) as t(...)
>
> Benchao Li  于2020年7月23日周四 下午12:44写道:
>
> > 我感觉这可能是calcite的bug,CC Danny老师
> >
> > Dream-底限  于2020年7月22日周三 下午5:46写道:
> >
> >> hi 、Benchao Li
> >> 我尝试了将数组打散的方式,但是报了一个莫名其妙的错误,可以帮忙看看嘛
> >>
> >> tableEnv.executeSql("CREATE TABLE parser_data_test (\n" +
> >> "  data ROW >> STRING,path STRING,country_id INT,create_time BIGINT," +
> >> "spent_time DECIMAL(10,2),features
> >> ROW<`user_ic_no_aku_uid.pdl_cdpd`
> >> STRING,`user_ic_no_aku_uid.pdl_current_unpay` INT," +
> >> "`user_ic_no_aku_uid.current_overdue_collection`
> >> INT>,rule_results ARRAY >> "rule_type_name STRING,`result` INT,in_path BOOLEAN>>>,\n" +
> >> "  createTime BIGINT,\n" +
> >> "  tindex INT\n" +
> >> ") WITH (\n" +
> >> " 'connector' = 'kafka-0.11',\n" +
> >> " 'topic' = 'parser_data_test',\n" +
> >> " 'properties.bootstrap.servers' = 'localhost:9092',\n" +
> >> " 'properties.group.id' = 'testGroup',\n" +
> >> " 'scan.startup.mode' = 'earliest-offset',\n" +
> >> " 'format' = 'json',\n" +
> >> " 'json.fail-on-missing-field' = 'false',\n" +
> >> " 'json.ignore-parse-errors' = 'true'\n" +
> >> ")");
> >>
> >> Table table = tableEnv.sqlQuery("select
> >>
> >>
> data.flow_task_id,data.features.`user_ic_no_aku_uid.pdl_current_unpay`,rule_id,tindex
> >> from parser_data_test CROSS JOIN UNNEST(data.rule_results) AS t
> >> (rule_id,rule_name,rule_type_name,`result`,in_path)");
> >>
> >> table.printSchema();
> >> tableEnv.toAppendStream(table,
> >>
> >>
> Types.ROW(TypeConversions.fromDataTypeToLegacyInfo(table.getSchema().getFieldDataTypes(.print();
> >>
> >>
> >> 异常信息:
> >>
> >> rg.apache.flink.table.api.ValidationException: SQL validation failed.
> >> From line 0, column 0 to line 1, column 139: Column 'data.data' not
> >> found in table 'parser_data_test'
> >>
> >> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> >>
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
> >> at
> >>
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
> >> at
> >>
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185)
> >> at
> >>
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
> >> at
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:664)
> >> at
> >>
> com.akulaku.data.flink.ParserDataTest.parserDataTest(ParserDataTest.java:63)
> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >> at
> >>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >> at
> >>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >> at java.lang.reflect.Method.invoke(Method.java:498)
> >> at
> >>
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> >> at
> >>
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> >> at
> >>
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> >> at
> >>
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> >> at
> >>
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> >> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> >> at
> >>
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> >> at
> >>
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> &g

Re: flink row 类型

2020-07-23 文章 Dream-
hi、xiao cai

可以说一下思路吗,我没太懂
》》可以考虑把字段索引值保存下来再获取

Dream-底限  于2020年7月23日周四 下午2:56写道:

> hi、Jingsong Li
> 我查看了对应的api,并运行了demo测试,通过CallContext我可以拿到对应的字段类型,但是无法拿到对应的字段名称
>
> >>在TypeInference中有input的type,这个type应该是包含字段信息的。
>
> xiao cai  于2020年7月23日周四 下午2:19写道:
>
>> 可以考虑把字段索引值保存下来再获取
>>
>>
>>  原始邮件
>> 发件人: Dream-底限
>> 收件人: user-zh
>> 发送时间: 2020年7月23日(周四) 14:08
>> 主题: Re: flink row 类型
>>
>>
>> hi 是的,我们的数据场景比较尴尬,那我想其他方式实现一下 Benchao Li 
>> 于2020年7月23日周四 下午12:55写道: > 这个应该是做不到的。name只是SQL plan过程的东西,在运行时它就没有什么实际意义了。 >
>> 你是想在udf里面获取row里面每个字段的名字是吧?如果是的话,我理解现在应该是做不到的。 > > Dream-底限 <
>> zhan...@akulaku.com> 于2020年7月22日周三 下午7:22写道: > > > hi、 > >
>> 我这面定义row数据,类型为ROW,可以通过 > >
>> row.getField(i)获取到对应的值,但是我想获取对应的row_name名称要怎么操作,貌似没有报漏获取名称的接口 > > > >
>> rule_key 转换为rule_key1,rulekey2 > > 1 > > 2 > > > > > -- > > Best, > Benchao
>> Li >
>
>


Re: flink row 类型

2020-07-23 文章 Dream-
hi、Jingsong Li
我查看了对应的api,并运行了demo测试,通过CallContext我可以拿到对应的字段类型,但是无法拿到对应的字段名称

>>在TypeInference中有input的type,这个type应该是包含字段信息的。

xiao cai  于2020年7月23日周四 下午2:19写道:

> 可以考虑把字段索引值保存下来再获取
>
>
>  原始邮件
> 发件人: Dream-底限
> 收件人: user-zh
> 发送时间: 2020年7月23日(周四) 14:08
> 主题: Re: flink row 类型
>
>
> hi 是的,我们的数据场景比较尴尬,那我想其他方式实现一下 Benchao Li 
> 于2020年7月23日周四 下午12:55写道: > 这个应该是做不到的。name只是SQL plan过程的东西,在运行时它就没有什么实际意义了。 >
> 你是想在udf里面获取row里面每个字段的名字是吧?如果是的话,我理解现在应该是做不到的。 > > Dream-底限 <
> zhan...@akulaku.com> 于2020年7月22日周三 下午7:22写道: > > > hi、 > >
> 我这面定义row数据,类型为ROW,可以通过 > >
> row.getField(i)获取到对应的值,但是我想获取对应的row_name名称要怎么操作,貌似没有报漏获取名称的接口 > > > >
> rule_key 转换为rule_key1,rulekey2 > > 1 > > 2 > > > > > -- > > Best, > Benchao
> Li >


Re: flink row 类型

2020-07-23 文章 Dream-
hi
是的,我们的数据场景比较尴尬,那我想其他方式实现一下

Benchao Li  于2020年7月23日周四 下午12:55写道:

> 这个应该是做不到的。name只是SQL plan过程的东西,在运行时它就没有什么实际意义了。
> 你是想在udf里面获取row里面每个字段的名字是吧?如果是的话,我理解现在应该是做不到的。
>
> Dream-底限  于2020年7月22日周三 下午7:22写道:
>
> > hi、
> > 我这面定义row数据,类型为ROW,可以通过
> > row.getField(i)获取到对应的值,但是我想获取对应的row_name名称要怎么操作,貌似没有报漏获取名称的接口
> >
> > rule_key  转换为rule_key1,rulekey2
> > 1
> > 2
> >
>
>
> --
>
> Best,
> Benchao Li
>


flink row 类型

2020-07-22 文章 Dream-
hi、
我这面定义row数据,类型为ROW,可以通过
row.getField(i)获取到对应的值,但是我想获取对应的row_name名称要怎么操作,貌似没有报漏获取名称的接口

rule_key  转换为rule_key1,rulekey2
1
2


Re: flink1.11 tablefunction

2020-07-22 文章 Dream-
e.DelegatingScope.fullyQualify(DelegatingScope.java:439)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5991)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5976)
at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:321)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.expand(SqlValidatorImpl.java:5583)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateUnnest(SqlValidatorImpl.java:3271)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3253)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3238)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3303)
at 
org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:86)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3247)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3510)
at 
org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
at 
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141)
... 28 more
Caused by: org.apache.calcite.sql.validate.SqlValidatorException:
Column 'data.data' not found in table 'parser_data_test'
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at 
org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:550)
... 51 more


Benchao Li  于2020年7月22日周三 下午2:05写道:

> Hi,
> 如果只是想打平array,Flink有个内置的方法,可以参考[1]的”Expanding arrays into a relation“部分
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#joins
>
> Jark Wu  于2020年7月22日周三 上午11:17写道:
>
> > Hi,
> >
> > Row[] 作为 eval 参数,目前还不支持。社区已经有一个 issue 在跟进支持这个功能:
> > https://issues.apache.org/jira/browse/FLINK-17855
> >
> >
> > Best,
> > Jark
> >
> > On Wed, 22 Jul 2020 at 10:45, Dream-底限  wrote:
> >
> > > hi,
> > >  我想将一个array打散成多行,但是并没有成功
> > >
> > > @FunctionHint(input =@DataTypeHint("ARRAY > > STRING,rule_type_name STRING,`result` INT,in_path BOOLEAN>>") ,output
> > > = @DataTypeHint("ROW > > STRING,`result` INT,in_path BOOLEAN>"))
> > > public static class FlatRowFunction extends TableFunction {
> > > private static final long serialVersionUID = 1L;
> > >
> > > public void eval(Row[] rows) {
> > > for (Row row : rows) {
> > > collect(row);
> > > }
> > > }
> > > }
> > >
> > > 异常如下:
> > >
> > > org.apache.flink.table.api.ValidationException: SQL validation failed.
> > > From line 1, column 149 to line 1, column 174: No match found for
> > > function signature
> > > flatRow( > > VARCHAR(2147483647) rule_name, VARCHAR(2147483647) rule_type_name,
> > > INTEGER result, BOOLEAN in_path) ARRAY>)
> > >
> > > at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> > >
> >
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
> > > at
> > >
> >
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
> > > at
> > >
> >
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:187)
> > > at
> > >
> >
> org.apache.flink.table.planner.delegation.ParserImpl.pars

flink1.11 sql

2020-07-21 文章 Dream-
hi
flink支持配置hive方言,那么flink可以直接使用hive内自定义的udf、udtf函数吗


flink1.11 实现tablefunction报错

2020-07-21 文章 Dream-
hi、
我这面实现了一个tablefunction想打撒数据,但是现在我运行官方demo样式的demo都无法成功,请问下面是什么原因:

@FunctionHint(output = @DataTypeHint("ROW"))
public static class FlatRowFunction extends TableFunction {
private static final long serialVersionUID = 1L;

public void eval(String rows) {
for (String row : rows.split("-->")) {
collect(Row.of(row));
}
}
}

sql使用:from parser_data_test, LATERAL TABLE(flatRow(data.path))

异常:

Exception in thread "main"
org.apache.flink.table.api.ValidationException: SQL validation failed.
>From line 1, column 154 to line 1, column 171: No match found for
function signature flatRow()
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:187)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:658)
at com.akulaku.data.flink.ParserDataTest.main(ParserDataTest.java:60)
Caused by: org.apache.calcite.runtime.CalciteContextException: From
line 1, column 154 to line 1, column 171: No match found for function
signature flatRow()
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at 
org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1882)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:305)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:218)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800)
at 
org.apache.calcite.sql.validate.ProcedureNamespace.validateImpl(ProcedureNamespace.java:57)
at 
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3256)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3238)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3303)
at 
org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:86)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3247)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3510)
at 
org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
at 
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141)
... 5 more
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: No
match found for function signature flatRow()
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 

Re: flink1.11 tablefunction

2020-07-21 文章 Dream-
SqlValidatorImpl.java:1110)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3256)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3238)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3303)
at 
org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:86)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3247)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3510)
at 
org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
at 
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141)
... 28 more
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: No
match found for function signature
flatRow()
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at 
org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:550)
... 56 more


godfrey he  于2020年7月21日周二 下午7:41写道:

> 可以,定义清楚 getResultType 和 getParameterTypes, 可以参考[1]
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide
>
>
> Dream-底限  于2020年7月21日周二 下午7:25写道:
>
> > hi
> >
> >
> 我这面在定义一个表函数,通过继承TableFunction完成操作,但是eval方法中的参数类型都是java基本类型(至少看到的demo都是如此),想问一下eval方法中可以传flink
> > 内部类型吗,比如说我想在eval()方法中传递Row类型要怎么操作,eval(Row row)
> >
>


flink1.11 tablefunction

2020-07-21 文章 Dream-
hi
我这面在定义一个表函数,通过继承TableFunction完成操作,但是eval方法中的参数类型都是java基本类型(至少看到的demo都是如此),想问一下eval方法中可以传flink
内部类型吗,比如说我想在eval()方法中传递Row类型要怎么操作,eval(Row row)


Re: flink解析kafka json数据

2020-07-21 文章 Dream-
hi
 json.ignore-parse-errors那只配置这个就好了, 其实我想把解析失败的数据存储到外部系统,而不是直接丢弃

Leonard Xu  于2020年7月21日周二 下午4:18写道:

> Hi,
> 我理解应该做不到,因为这两个format参数在format里就做的。
> json.ignore-parse-errors 是在
> format解析时跳过解析失败的数据继续解析下一行,json.fail-on-missing-field
> 是标记如果字段少时是否失败还是继续(缺少的字段用null补上)
> 这两个不能同时为ture,语义上就是互斥的。
>
> Best
> Leonard Xu
> > 在 2020年7月21日,16:08,Dream-底限  写道:
> >
> >
> json数据,在创建表的时候发现json数据解析的时候有下面两项,这两项如果开启那么解析失败的数据是会被丢掉吗,有没有方式可以把解析失败的数据打到外部存储
>
>


Re: connector hive依赖冲突

2020-07-21 文章 Dream-
hi,
不排除依赖的话环境都起不来的哈,
java.lang.IncompatibleClassChangeError: Implementing class

at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at
org.apache.flink.table.planner.delegation.PlannerBase.(PlannerBase.scala:112)
at
org.apache.flink.table.planner.delegation.StreamPlanner.(StreamPlanner.scala:48)
at
org.apache.flink.table.planner.delegation.BlinkPlannerFactory.create(BlinkPlannerFactory.java:50)
at
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.create(StreamTableEnvironmentImpl.java:130)
at
org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(StreamTableEnvironment.java:111)
at
com.akulaku.data.flink.ParserDataTest.parserDataTest(ParserDataTest.java:24)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at
com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at
com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)

Rui Li  于2020年7月20日周一 上午11:48写道:

> 现在具体是遇到了什么冲突呀?hive
> connector本身在依赖hive的时候确实也排除了很多传递依赖,才能正常运行UT和IT。也可以参考我们的pom来看排除了哪些依赖:
>
> https://github.com/apache/flink/blob/release-1.11.0/flink-connectors/flink-connector-hive/pom.xml
>
> On Fri, Jul 17, 2020 at 5:32 PM Dream-底限  wrote:
>
> > hi
> > 我用的是用户定义依赖,没有用捆绑依赖包,捆绑依赖包还要自己下载一次。
> >
> > Dream-底限  于2020年7月17日周五 下午5:24写道:
> >
> > > 1.9和1.10时候排除一些传递依赖后在idea和打uber jar在集群环境都可以运行,不排除传递依赖的话在idea运行不了;
> > >
> >
> 1.11现在只在本地测哪,不排除传递依赖idea运行不了,集群环境还没弄,但是我感觉在idea直接run这个功能好多人都需要,文档是不是可以改进一下
> > >
> > > Jingsong Li  于2020年7月17日周五 下午5:16写道:
> > >
> > >> 用bundle jar可以搞定吗?
> > >>
> > >> [1]
> > >>
> > >>
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/#using-bundled-hive-jar
> > >>
> > >> Best,
> > >> Jingsong
> > >>
> > >> On Fri, Jul 17, 2020 at 5:14 PM Dream-底限  wrote:
> > >>
> > >> > hi:
> > >> >
> > >> >
> > >>
> >
> 大佬们,下面连接hive的依赖包的哪个传递依赖导致的jar包冲突,我从1.9到1.11每次在maven按照官方文档引包都会出现依赖冲突1.9刚发布的时候对下面的引包有做依赖排除,后来文档改了
> > >> >
> > >> > // Flink's Hive connector.Contains flink-hadoop-compatibility and
> > >> > flink-orc jars
> > >> >flink-connector-hive_2.11-1.11.0.jar
> > >> >// Hive dependencies
> > >> >hive-exec-2.3.4.jar
> > >> >
> > >>
> > >>
> > >> --
> > >> Best, Jingsong Lee
> > >>
> > >
> >
>
>
> --
> Best regards!
> Rui Li
>


flink解析kafka json数据

2020-07-21 文章 Dream-
hi
我这面在使用sql api解析kafka
json数据,在创建表的时候发现json数据解析的时候有下面两项,这两项如果开启那么解析失败的数据是会被丢掉吗,有没有方式可以把解析失败的数据打到外部存储

json.ignore-parse-errors
son.fail-on-missing-field


Re: flink1.11 run

2020-07-20 文章 Dream-
hi、
对于下面这两个的滚动方式,是选优先到达的吗,就是1min的checkpoint和128mb的file size,不管哪个先到都会滚动生成新的文件

》可以,默认下 128MB 滚动,Checkpoint 滚动

Jingsong Li  于2020年7月20日周一 下午6:12写道:

> Hi Dream,
>
> > 1.一定要在flink内部先建立hive表吗?
>
> 不用,哪边建无所谓
>
> > 2、如果直接写hive内(hue建表)已经建好的hive表可以吗,文件会有滚动策略吗
>
> 可以,默认下 128MB 滚动,Checkpoint 滚动。
>
> Best,
> Jingsong
>
> On Mon, Jul 20, 2020 at 5:15 PM Dream-底限  wrote:
>
> >  hi
> > 好的,想问一下stream写hive表的时候:
> > 1、一定要在flink内部先建立hive表吗?
> > 2、如果直接写hive内(hue建表)已经建好的hive表可以吗,文件会有滚动策略吗
> >
> > Rui Li  于2020年7月20日周一 下午4:44写道:
> >
> > > tableEnv.executeSql就已经提交作业了,不需要再执行execute了哈
> > >
> > > On Mon, Jul 20, 2020 at 4:29 PM Dream-底限  wrote:
> > >
> > > > hi,我这面请一个一个kafka到hive的程序,但程序无法运行,请问什么原因:
> > > >
> > > > 异常:
> > > > The program finished with the following exception:
> > > >
> > > > org.apache.flink.client.program.ProgramInvocationException: The main
> > > method
> > > > caused an error: No operators defined in streaming topology. Cannot
> > > > generate StreamGraph.
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> > > > at
> > >
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
> > > > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
> > > > at java.security.AccessController.doPrivileged(Native Method)
> > > > at javax.security.auth.Subject.doAs(Subject.java:422)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> > > > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> > > > Caused by: java.lang.IllegalStateException: No operators defined in
> > > > streaming topology. Cannot generate StreamGraph.
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
> > > > at
> > > >
> > > >
> > >
> >
> com.akulaku.data.flink.StreamingWriteToHive.main(StreamingWriteToHive.java:80)
> > > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > > > at
> > > >
> > > >
> > >
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> > > > at
> > > >
> > > >
> > >
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > > > at java.lang.reflect.Method.invoke(Method.java:498)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> > > > ... 11 more
> > > > 代码:
> > > >
> > > >  StreamExecutionEnvironment environment =
> > > > StreamExecutionEnvironment.getExecutionEnvironment();
> > > > EnvironmentSettings settings =
> > > >
> > > >
> > >
> >
> EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
> > > > StreamTableEnvironment tableEnv =
> >

Re: Flink 1.11 Hive Streaming Write的问题

2020-07-20 文章 Dream-
hi、
请问这个问题最后怎么解决了,数据能滚动写入hive了嘛,我这面开启了checkpoint之后hive也是没数据

李佳宸  于2020年7月16日周四 下午10:39写道:

> 好的,谢谢~~~
>
> JasonLee <17610775...@163.com> 于2020年7月16日周四 下午8:22写道:
>
> > hi
> > 需要开启checkpoint
> >
> >
> > | |
> > JasonLee
> > |
> > |
> > 邮箱:17610775...@163.com
> > |
> >
> > Signature is customized by Netease Mail Master
> >
> > 在2020年07月16日 18:03,李佳宸 写道:
> > 想请教下大家 hive streaming write需要有哪些配置,不知道为什么我的作业能够跑起来,但是没有数据写入hive。
> > 批量的hive写入,流环境的读取是正常的。
> >
> > 附代码,很简短:
> >
> > public class KafkaToHiveStreaming {
> >public static void main(String[] arg) throws Exception{
> >StreamExecutionEnvironment bsEnv =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> >EnvironmentSettings bsSettings =
> >
> >
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> >StreamTableEnvironment bsTableEnv =
> > StreamTableEnvironment.create(bsEnv, bsSettings);
> >String name= "myhive";
> >String defaultDatabase = "default";
> >String hiveConfDir =
> > "/Users/uzi/Downloads/Hadoop/apache-hive-3.1.2-bin/conf/"; // a local
> > path
> >String version = "3.1.2";
> >
> >HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
> > hiveConfDir, version);
> >bsTableEnv.registerCatalog("myhive", hive);
> >bsTableEnv.useCatalog("myhive");
> >bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
> >bsTableEnv.executeSql("CREATE TABLE topic_products (" +
> >"  id BIGINT ," +
> >"  order_id STRING," +
> >"  amount DECIMAL(10, 2)," +
> >"  create_time TIMESTAMP " +
> >") WITH (" +
> >" 'connector' = 'kafka'," +
> >" 'topic' = 'order.test'," +
> >" 'properties.bootstrap.servers' = 'localhost:9092'," +
> >" 'properties.group.id' = 'testGroup'," +
> >" 'scan.startup.mode' = 'earliest-offset', " +
> >" 'format' = 'json'  " +
> >")");
> >bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
> >
> >bsTableEnv.executeSql("CREATE TABLE hive_sink_table_streaming (" +
> >"  id BIGINT ," +
> >"  order_id STRING," +
> >"  amount DECIMAL(10, 2)" +
> >"  )");
> >bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
> >bsTableEnv.executeSql("CREATE TABLE print_table WITH
> > ('connector' = 'print')" +
> >"LIKE INSERT INTO hive_sink_table_streaming (EXCLUDING
> > ALL)");
> >
> >bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
> >bsTableEnv.executeSql("INSERT INTO hive_sink_table_streaming
> SELECT
> > " +
> >"id, " +
> >"order_id, " +
> >"amount " +
> >"FROM topic_products");
> >
> >Table table1 = bsTableEnv.from("hive_sink_table_streaming");
> >table1.executeInsert("print_table");
> >}
> > }
> >
>


Re: flink1.11 run

2020-07-20 文章 Dream-
 hi
好的,想问一下stream写hive表的时候:
1、一定要在flink内部先建立hive表吗?
2、如果直接写hive内(hue建表)已经建好的hive表可以吗,文件会有滚动策略吗

Rui Li  于2020年7月20日周一 下午4:44写道:

> tableEnv.executeSql就已经提交作业了,不需要再执行execute了哈
>
> On Mon, Jul 20, 2020 at 4:29 PM Dream-底限  wrote:
>
> > hi,我这面请一个一个kafka到hive的程序,但程序无法运行,请问什么原因:
> >
> > 异常:
> > The program finished with the following exception:
> >
> > org.apache.flink.client.program.ProgramInvocationException: The main
> method
> > caused an error: No operators defined in streaming topology. Cannot
> > generate StreamGraph.
> > at
> >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
> > at
> >
> >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> > at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> > at
> >
> >
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
> > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
> > at
> >
> >
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
> > at
> >
> >
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
> > at java.security.AccessController.doPrivileged(Native Method)
> > at javax.security.auth.Subject.doAs(Subject.java:422)
> > at
> >
> >
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
> > at
> >
> >
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> > Caused by: java.lang.IllegalStateException: No operators defined in
> > streaming topology. Cannot generate StreamGraph.
> > at
> >
> >
> org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
> > at
> >
> >
> org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
> > at
> >
> >
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
> > at
> >
> >
> com.akulaku.data.flink.StreamingWriteToHive.main(StreamingWriteToHive.java:80)
> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > at
> >
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> > at
> >
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > at java.lang.reflect.Method.invoke(Method.java:498)
> > at
> >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> > ... 11 more
> > 代码:
> >
> >  StreamExecutionEnvironment environment =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> > EnvironmentSettings settings =
> >
> >
> EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
> > StreamTableEnvironment tableEnv =
> > StreamTableEnvironment.create(environment, settings);
> >
> >
> >
> environment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> > environment.setStateBackend(new MemoryStateBackend());
> > environment.getCheckpointConfig().setCheckpointInterval(5000);
> >
> > String name = "myhive";
> > String defaultDatabase = "tmp";
> > String hiveConfDir = "/etc/alternatives/hive-conf/";
> > String version = "1.1.0";
> >
> > HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
> > hiveConfDir, version);
> > tableEnv.registerCatalog("myhive", hive);
> > tableEnv.useCatalog("myhive");
> >
> > tableEnv.executeSql("CREATE TABLE tmp.user_behavior (\n" +
> > "  user_id BIGINT,\n" +
> > "  item_id STRING,\n" +
> > "  behavior STRING,\n" +
> > "  ts AS PROCTIME()\n" +
> > ") WITH (\n" +
> > " 'connector' = 'kafka-0.11',\n" +
> > " 'topic' = 'user_behavior',\n" +
> > " 'properties.bootstrap.servers' = 'localhost:9092',\n" +
> > " 'properties.group.id' = 'testGroup',\n" +
> > " 'scan.startup.mode' = 'ear

flink1.11 run

2020-07-20 文章 Dream-
hi,我这面请一个一个kafka到hive的程序,但程序无法运行,请问什么原因:

异常:
The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: No operators defined in streaming topology. Cannot
generate StreamGraph.
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: java.lang.IllegalStateException: No operators defined in
streaming topology. Cannot generate StreamGraph.
at
org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
at
org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
at
com.akulaku.data.flink.StreamingWriteToHive.main(StreamingWriteToHive.java:80)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
... 11 more
代码:

 StreamExecutionEnvironment environment =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings =
EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
StreamTableEnvironment tableEnv =
StreamTableEnvironment.create(environment, settings);


environment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
environment.setStateBackend(new MemoryStateBackend());
environment.getCheckpointConfig().setCheckpointInterval(5000);

String name = "myhive";
String defaultDatabase = "tmp";
String hiveConfDir = "/etc/alternatives/hive-conf/";
String version = "1.1.0";

HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
hiveConfDir, version);
tableEnv.registerCatalog("myhive", hive);
tableEnv.useCatalog("myhive");

tableEnv.executeSql("CREATE TABLE tmp.user_behavior (\n" +
"  user_id BIGINT,\n" +
"  item_id STRING,\n" +
"  behavior STRING,\n" +
"  ts AS PROCTIME()\n" +
") WITH (\n" +
" 'connector' = 'kafka-0.11',\n" +
" 'topic' = 'user_behavior',\n" +
" 'properties.bootstrap.servers' = 'localhost:9092',\n" +
" 'properties.group.id' = 'testGroup',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
" 'format' = 'json',\n" +
" 'json.fail-on-missing-field' = 'false',\n" +
" 'json.ignore-parse-errors' = 'true'\n" +
")");

//tableEnv.executeSql("CREATE TABLE print_table (\n" +
//" user_id BIGINT,\n" +
//" item_id STRING,\n" +
//" behavior STRING,\n" +
//" tsdata STRING\n" +
//") WITH (\n" +
//" 'connector' = 'print'\n" +
//")");
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
tableEnv.executeSql("CREATE TABLE tmp.streamhivetest (\n" +
" user_id BIGINT,\n" +
" item_id STRING,\n" +
" behavior STRING,\n" +
" tsdata STRING\n" +
") STORED AS parquet TBLPROPERTIES (\n" +
" 'sink.rolling-policy.file-size' = '12MB',\n" +
" 'sink.rolling-policy.rollover-interval' = '1 min',\n" +
" 'sink.rolling-policy.check-interval' = '1 min',\n" +
" 'execution.checkpointing.interval' = 'true'\n" +
")");

tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
tableEnv.executeSql("insert into streamhivetest select
user_id,item_id,behavior,DATE_FORMAT(ts, '-MM-dd') as tsdata from
user_behavior");

tableEnv.execute("stream-write-hive");


Re: connector hive依赖冲突

2020-07-17 文章 Dream-
hi
我用的是用户定义依赖,没有用捆绑依赖包,捆绑依赖包还要自己下载一次。

Dream-底限  于2020年7月17日周五 下午5:24写道:

> 1.9和1.10时候排除一些传递依赖后在idea和打uber jar在集群环境都可以运行,不排除传递依赖的话在idea运行不了;
> 1.11现在只在本地测哪,不排除传递依赖idea运行不了,集群环境还没弄,但是我感觉在idea直接run这个功能好多人都需要,文档是不是可以改进一下
>
> Jingsong Li  于2020年7月17日周五 下午5:16写道:
>
>> 用bundle jar可以搞定吗?
>>
>> [1]
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/#using-bundled-hive-jar
>>
>> Best,
>> Jingsong
>>
>> On Fri, Jul 17, 2020 at 5:14 PM Dream-底限  wrote:
>>
>> > hi:
>> >
>> >
>> 大佬们,下面连接hive的依赖包的哪个传递依赖导致的jar包冲突,我从1.9到1.11每次在maven按照官方文档引包都会出现依赖冲突1.9刚发布的时候对下面的引包有做依赖排除,后来文档改了
>> >
>> > // Flink's Hive connector.Contains flink-hadoop-compatibility and
>> > flink-orc jars
>> >flink-connector-hive_2.11-1.11.0.jar
>> >// Hive dependencies
>> >hive-exec-2.3.4.jar
>> >
>>
>>
>> --
>> Best, Jingsong Lee
>>
>


connector hive依赖冲突

2020-07-17 文章 Dream-
hi:
大佬们,下面连接hive的依赖包的哪个传递依赖导致的jar包冲突,我从1.9到1.11每次在maven按照官方文档引包都会出现依赖冲突1.9刚发布的时候对下面的引包有做依赖排除,后来文档改了

// Flink's Hive connector.Contains flink-hadoop-compatibility and flink-orc jars
   flink-connector-hive_2.11-1.11.0.jar
   // Hive dependencies
   hive-exec-2.3.4.jar


flink1.9写权限认证的es6

2020-07-16 文章 Dream-
hi:
请问flink如何将数据写入到权限认证的es集群哪,没找到配置用户名密码的地方,哪位大佬帮忙解答一下


Re: flink1.9状态及作业迁移

2020-07-13 文章 Dream-
hi、
请问对于下面的情况,Checkpoint meta中存储的hdfs namespace可以修改吗
》》Checkpoint meta中存储的是完整路径,所以一般会把hdfs的namespace存储起来,导致没办法直接迁移。

Yun Tang  于2020年7月14日周二 上午11:54写道:

> Checkpoint meta中存储的是完整路径,所以一般会把hdfs的namespace存储起来,导致没办法直接迁移。
>
> Flink-1.11 支持将savepoint(但是不支持Checkpoint)进行位置迁移 [1],而对于Flink-1.9,二者均不支持。
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-5763
>
> 祝好
> 唐云
>
> ____
> From: Dream-底限 
> Sent: Tuesday, July 14, 2020 11:07
> To: user-zh@flink.apache.org 
> Subject: flink1.9状态及作业迁移
>
> hi:
>
> flink1.9的检查点或保存点中会保留hadoop集群的nameservice数据吗?现在想将一个集群的flink作业迁移到另一个集群,但两个集群的nameservice名称不一样,迁移会有问题吗?如果有问题的话对应状态保存的nameservice可以修改吗?或者说迁移的时候还有哪些其他需要注意的问题?
>


flink1.9状态及作业迁移

2020-07-13 文章 Dream-
hi:
flink1.9的检查点或保存点中会保留hadoop集群的nameservice数据吗?现在想将一个集群的flink作业迁移到另一个集群,但两个集群的nameservice名称不一样,迁移会有问题吗?如果有问题的话对应状态保存的nameservice可以修改吗?或者说迁移的时候还有哪些其他需要注意的问题?


Re: flink sql 侧输出

2020-07-13 文章 Dream-
hi、
好的,感谢

On Mon, Jul 13, 2020 at 12:07 PM Jark Wu  wrote:

> Hi,
>
> Flink SQL/Table 目前还不支持 side output。不过有一个实验性的功能可以处理延迟数据,
> 你可以给你的作业配上:
>
> table.exec.emit.late-fire.enabled = true
> table.exec.emit.late-fire.delay = 1min
>
> 同时 TableConfig#setIdleStateRetentionTime 需要配上,表示窗口状态允许保留多久,即 window
> allowLateness 。
>
> 具体可以看下 org.apache.flink.table.planner.plan.utils.WindowEmitStrategy 这个类。
>
> Best,
> Jark
>
> On Mon, 13 Jul 2020 at 11:31, Dream-底限  wrote:
>
> > hi、
> > 大佬们、我们这面主要基于blink sql完成转换计算,但是可能会有延迟数据,现在想把延迟数据通过侧输出保存下来,在table/sql
> > api中要怎么操作比较合理一点?或者有没有其他处理延迟数据的方式?
> >
>


flink sql 侧输出

2020-07-12 文章 Dream-
hi、
大佬们、我们这面主要基于blink sql完成转换计算,但是可能会有延迟数据,现在想把延迟数据通过侧输出保存下来,在table/sql
api中要怎么操作比较合理一点?或者有没有其他处理延迟数据的方式?


Re: flink kafka connector中获取kafka元数据

2020-07-07 文章 Dream-
好的

On Tue, Jul 7, 2020 at 5:30 PM Leonard Xu  wrote:

> 嗯,这个在FLIP-107里会支持,目前没法拿到这些meta数据,可以关注下FLIP-107的进展。
>
> Best,
> Leonard Xu
>
> > 在 2020年7月7日,17:26,Dream-底限  写道:
> >
> > hi
> > 是的,想以下面这种方式获取
> >
> > CREATE TABLE MyUserTable (key string,topic string,,以及其他的数据字段) WITH
> > ('connector.type' = 'kafka','connector.version' = '0.11' ,...)
> >
> >
> > On Tue, Jul 7, 2020 at 5:19 PM Leonard Xu  wrote:
> >
> >> Hi,
> >> kafka元数据 是指kafka记录里的 meta数据吗? 比如kafka自带的timestamp,kafka的key信息。
> >> 如果是这些信息的话, Table/SQL API 目前还没办法拿到, FLIP-107[1] 会支持这个事情。
> >>
> >> 祝好,
> >> Leonard Xu
> >>
> >> [1]
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records
> >> <
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107:+Reading+table+columns+from+different+parts+of+source+records
> >>>
> >>
> >>> 在 2020年7月7日,17:12,Dream-底限  写道:
> >>>
> >>> kafka元数据
> >>
> >>
>
>


Re: flink kafka connector中获取kafka元数据

2020-07-07 文章 Dream-
hi
是的,想以下面这种方式获取

CREATE TABLE MyUserTable (key string,topic string,,以及其他的数据字段) WITH
('connector.type' = 'kafka','connector.version' = '0.11' ,...)


On Tue, Jul 7, 2020 at 5:19 PM Leonard Xu  wrote:

> Hi,
>  kafka元数据 是指kafka记录里的 meta数据吗? 比如kafka自带的timestamp,kafka的key信息。
> 如果是这些信息的话, Table/SQL API 目前还没办法拿到, FLIP-107[1] 会支持这个事情。
>
> 祝好,
> Leonard Xu
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records
> <
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107:+Reading+table+columns+from+different+parts+of+source+records
> >
>
> > 在 2020年7月7日,17:12,Dream-底限  写道:
> >
> > kafka元数据
>
>


flink kafka connector中获取kafka元数据

2020-07-07 文章 Dream-
hi、
flink table/sql api中,有办法获取kafka元数据吗?

tableEnvironment.sqlUpdate(CREATE TABLE MyUserTable (...) WITH
('connector.type' = 'kafka','connector.version' = '0.11' ,...))


Re: flink任务提交方式

2020-07-02 文章 Dream-
好的,感谢

On Thu, Jul 2, 2020 at 12:37 PM jianxu  wrote:

> 你可以看下这个项目https://github.com/todd5167/clusters-submiter,改造下应该满足你的需求。
> 在 2020-07-02 12:09:05,"Dream-底限"  写道:
> >hi
>
> >请问现在flink有没有像sparklauncher这种任务提交方式,在任务提交成功后返回对应的任务id(不管是onyarn还是standlone),我这面想用java代码提交任务并在提交后获取任务id,请问有没有对应功能或工具
>


flink任务提交方式

2020-07-01 文章 Dream-
hi
请问现在flink有没有像sparklauncher这种任务提交方式,在任务提交成功后返回对应的任务id(不管是onyarn还是standlone),我这面想用java代码提交任务并在提交后获取任务id,请问有没有对应功能或工具