Re: 如何 Flink 通过不同的hive_catalog,访问离线和实时两套集群的hive?

2021-09-02 Thread Jim Chen
你好,

你说的这些问题,不同集群的配置文件和hive版本,我都注意到了。

我们这边2套集群的版本都是一样的,配置文件,我也确认过是两套集群的

我的catalog配置文件如下:

catalogs:  # empty list
- name: hive_raltime_catalog  #实时集群
  type: hive
  hive-conf-dir: /usr/hdp/3.1.5.0-152/hive/conf/
  default-database: default
  hadoop-conf-dir: /usr/hdp/3.1.5.0-152/hadoop/conf/
- name: hive_offline_catalog #离线集群
  type: hive
  hive-conf-dir: /home/rtdev/jim/app/flink-1.13.1/offline_conf/hive/
  default-database: default
  hadoop-conf-dir: /home/rtdev/jim/app/flink-1.13.1/offline_conf/hadoop/


Rui Li  于2021年8月30日周一 上午11:59写道:

> 你好,
>
>
> 这个看起来是Hadoop的报错,连接失败的这个节点是NameNode么?创建HiveCatalog时可以指定Hadoop的配置文件的,要保证两个HiveCatalog读到的Hadoop配置是正确的。
>
> 另外使用多个HiveCatalog需要保证hive版本一致,不同版本的hive没办法一起用
>
> On Fri, Aug 27, 2021 at 3:01 PM Jim Chen 
> wrote:
>
> > Hi
> >
> > 集群上根本没有这个端口,也不知道这个端口是干嘛用的,为啥要连这个。这个是实时集群上的端口
> >
> > Caizhi Weng  于2021年8月27日周五 下午2:33写道:
> >
> > > Hi!
> > >
> > > 看起来是 Flink 集群不能访问到
> > wx12-dsjrt-master001/xx.xx.xx.xx:8050,检查一下网络以及这个端口的状态看看?
> > >
> > > Jim Chen  于2021年8月27日周五 下午1:59写道:
> > >
> > > > Hi, All
> > > >
> > > >
> > >
> >
> 我是使用的flink1.13.1版本,我们有2套Hadoop集群,离线和实时集群。现在实时集群上的任务,想通过hive_catalog访问离线集群。
> > > >   按照官网例子,我分别配置离线和实时的hive-conf-dir的不同路径,发现一只报错。如:
> > > > 2021-08-27 13:50:22,902 INFO  org.apache.hadoop.ipc.Client
> > > > [] - Retrying connect to server:
> > > > wx12-dsjrt-master001/xx.xx.xx.xx:8050. Already tried 6 time(s); retry
> > > > policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=50,
> > > sleepTime=1000
> > > > MILLISECONDS)。
> > > >   大家在生产中是如何解决这种问题的呢?非常感谢!
> > > >
> > >
> >
>
>
> --
> Best regards!
> Rui Li
>


Re: 如何 Flink 通过不同的hive_catalog,访问离线和实时两套集群的hive?

2021-08-27 Thread Jim Chen
Hi, Caizhi

你们有实时集群和离线集群分离的场景吗?你们是怎么做的?


Caizhi Weng  于2021年8月27日周五 下午2:33写道:

> Hi!
>
> 看起来是 Flink 集群不能访问到 wx12-dsjrt-master001/xx.xx.xx.xx:8050,检查一下网络以及这个端口的状态看看?
>
> Jim Chen  于2021年8月27日周五 下午1:59写道:
>
> > Hi, All
> >
> >
> 我是使用的flink1.13.1版本,我们有2套Hadoop集群,离线和实时集群。现在实时集群上的任务,想通过hive_catalog访问离线集群。
> >   按照官网例子,我分别配置离线和实时的hive-conf-dir的不同路径,发现一只报错。如:
> > 2021-08-27 13:50:22,902 INFO  org.apache.hadoop.ipc.Client
> > [] - Retrying connect to server:
> > wx12-dsjrt-master001/xx.xx.xx.xx:8050. Already tried 6 time(s); retry
> > policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=50,
> sleepTime=1000
> > MILLISECONDS)。
> >   大家在生产中是如何解决这种问题的呢?非常感谢!
> >
>


Re: 如何 Flink 通过不同的hive_catalog,访问离线和实时两套集群的hive?

2021-08-27 Thread Jim Chen
Hi

集群上根本没有这个端口,也不知道这个端口是干嘛用的,为啥要连这个。这个是实时集群上的端口

Caizhi Weng  于2021年8月27日周五 下午2:33写道:

> Hi!
>
> 看起来是 Flink 集群不能访问到 wx12-dsjrt-master001/xx.xx.xx.xx:8050,检查一下网络以及这个端口的状态看看?
>
> Jim Chen  于2021年8月27日周五 下午1:59写道:
>
> > Hi, All
> >
> >
> 我是使用的flink1.13.1版本,我们有2套Hadoop集群,离线和实时集群。现在实时集群上的任务,想通过hive_catalog访问离线集群。
> >   按照官网例子,我分别配置离线和实时的hive-conf-dir的不同路径,发现一只报错。如:
> > 2021-08-27 13:50:22,902 INFO  org.apache.hadoop.ipc.Client
> > [] - Retrying connect to server:
> > wx12-dsjrt-master001/xx.xx.xx.xx:8050. Already tried 6 time(s); retry
> > policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=50,
> sleepTime=1000
> > MILLISECONDS)。
> >   大家在生产中是如何解决这种问题的呢?非常感谢!
> >
>


Any one can help me? How to connect offline hadoop cluster and realtime hadoop cluster by different hive catalog?

2021-08-27 Thread Jim Chen
Hi, All
  My flink version is 1.13.1 and my company have two hadoop cluster,
offline hadoop cluster and realtime hadoop cluster. Now, on realtime hadoop
cluster, we want to submit flink job to connect offline hadoop cluster by
different hive catalog. I use different hive configuration diretory in hive
catalog configuration. The error log of flink job as follow:

  2021-08-27 13:50:22,902 INFO  org.apache.hadoop.ipc.Client
  [] - Retrying connect to server:
realtime-cluster-master001/xx.xx.xx.xx:8050. Already tried 6 time(s); retry
policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=50, sleepTime=1000
MILLISECONDS)。

  Any one can help me? Thanks!


如何 Flink 通过不同的hive_catalog,访问离线和实时两套集群的hive?

2021-08-26 Thread Jim Chen
Hi, All
  我是使用的flink1.13.1版本,我们有2套Hadoop集群,离线和实时集群。现在实时集群上的任务,想通过hive_catalog访问离线集群。
  按照官网例子,我分别配置离线和实时的hive-conf-dir的不同路径,发现一只报错。如:
2021-08-27 13:50:22,902 INFO  org.apache.hadoop.ipc.Client
[] - Retrying connect to server:
wx12-dsjrt-master001/xx.xx.xx.xx:8050. Already tried 6 time(s); retry
policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=50, sleepTime=1000
MILLISECONDS)。
  大家在生产中是如何解决这种问题的呢?非常感谢!


Re: Re: Re: Re: 通过savepoint重启任务,link消费kafka,有重复消息

2021-08-09 Thread Jim Chen
我看flink sql写到kafka的时候,也没有开启事务。所以,这一点,我非常想不通


东东  于2021年8月9日周一 下午5:56写道:

>
>
>
> 有未提交的事务才会出现重复呗,也许你重启的时候恰好所有事务都已经提交了呢,比如说,有一段时间流里面没有新数据进来。
>
>
> 在 2021-08-09 17:44:57,"Jim Chen"  写道:
> >有个奇怪的问题:用flink sql去做上面的逻辑,*没有设置isolation.level 为 read_committed,*重启方式用stop
> >with savepoint,结果没有重复
> >
> >我理解,flink sql没有设置隔离级别的话,应该会出现重复的。Tell me why..
> >
> >东东  于2021年8月2日周一 下午7:13写道:
> >
> >> 从topic B实时写到hive,这个job需要配置 isolation.level 为
> >> read_committed,否则会把还没有提交甚至是已经终止的事务消息读出来,这样就很难不出现重复了。
> >>
> >>
> >> 在 2021-08-02 19:00:13,"Jim Chen"  写道:
> >> >我不太懂,下游的isolation.level是不是read_committed是啥意思。
> >> >我是把topic A中的partitionId和offset写到消息体中,然后flink程序,把消息写到下游的topic B中。将topic
> >> >B实时写到hive上,然后在hive表中,根据partitionId和offset去重,发现有重复消费了
> >> >
> >> >东东  于2021年8月2日周一 下午6:20写道:
> >> >
> >> >> 下游如何发现重复数据的,下游的isolation.level是不是read_committed
> >> >>
> >> >>
> >> >> 在 2021-08-02 18:14:27,"Jim Chen"  写道:
> >> >> >Hi 刘建刚,
> >> >> >我使用了stop with savepoint,但是还是发现,下游有重复数据。
> >> >> >停止命令:
> >> >> >/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink stop \
> >> >> >-yid application_1625497885855_703064 \
> >> >> >-p
> >> >>
> >> >>
> >>
> >hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint
> >> >> >\
> >> >> >-d 55e7ebb6fa38faaba61b4b9a7cd89827
> >> >> >
> >> >> >重启命令:
> >> >> >/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \
> >> >> >-m yarn-cluster \
> >> >> >-yjm 4096 -ytm 4096 \
> >> >> >-ynm User_Click_Log_Split_All \
> >> >> >-yqu syh_offline \
> >> >> >-ys 2 \
> >> >> >-d \
> >> >> >-p 64 \
> >> >> >-s
> >> >>
> >> >>
> >>
> >hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-55e7eb-11203031f2a5
> >> >> >\
> >> >> >-n \
> >> >> >-c com.datacenter.etl.ods.common.mobile.UserClickLogAll \
> >> >>
> >> >>
> >>
> >/opt/case/app/realtime/v1.0/batch/buryingpoint/paiping/all/realtime_etl-1.0-SNAPSHOT.jar
> >> >> >
> >> >> >
> >> >> >刘建刚  于2021年8月2日周一 下午3:49写道:
> >> >> >
> >> >> >> cancel with savepoint是之前的接口了,会造成kafka数据的重复。新的stop with
> >> >> >> savepoint则会在做savepoint的时候,不再发送数据,从而避免了重复数据,哭啼可以参考
> >> >> >>
> >> >> >>
> >> >>
> >>
> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/savepoints/
> >> >> >>
> >> >> >> Jim Chen  于2021年8月2日周一 下午2:33写道:
> >> >> >>
> >> >> >> > 我是通过savepoint的方式重启的,命令如下:
> >> >> >> >
> >> >> >> > cancel command:
> >> >> >> >
> >> >> >> > /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink cancel \
> >> >> >> > -yid application_1625497885855_698371 \
> >> >> >> > -s
> >> >> >> >
> >> >> >> >
> >> >> >>
> >> >>
> >>
> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint
> >> >> >> > \
> >> >> >> > 59cf6ccc83aa163bd1e0cd3304dfe06a
> >> >> >> >
> >> >> >> > print savepoint:
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >>
> >> >>
> >>
> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494
> >> >> >> >
> >> >> >> >
> >> >> >> > restart command:
> >> >> >> >
> >> >> >> > /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \
> >> >> >> > -m yarn-cluster \
> >> >> >> > -yjm 4096 -ytm 4096 \
> >> >> >> > -ynm User_Click_Log_Split_All \
> >> >> >> > -yqu syh_offline \
> >> >> >> > -ys 2 \
> >

Re: Re: Re: 通过savepoint重启任务,link消费kafka,有重复消息

2021-08-09 Thread Jim Chen
有个奇怪的问题:用flink sql去做上面的逻辑,*没有设置isolation.level 为 read_committed,*重启方式用stop
with savepoint,结果没有重复

我理解,flink sql没有设置隔离级别的话,应该会出现重复的。Tell me why..

东东  于2021年8月2日周一 下午7:13写道:

> 从topic B实时写到hive,这个job需要配置 isolation.level 为
> read_committed,否则会把还没有提交甚至是已经终止的事务消息读出来,这样就很难不出现重复了。
>
>
> 在 2021-08-02 19:00:13,"Jim Chen"  写道:
> >我不太懂,下游的isolation.level是不是read_committed是啥意思。
> >我是把topic A中的partitionId和offset写到消息体中,然后flink程序,把消息写到下游的topic B中。将topic
> >B实时写到hive上,然后在hive表中,根据partitionId和offset去重,发现有重复消费了
> >
> >东东  于2021年8月2日周一 下午6:20写道:
> >
> >> 下游如何发现重复数据的,下游的isolation.level是不是read_committed
> >>
> >>
> >> 在 2021-08-02 18:14:27,"Jim Chen"  写道:
> >> >Hi 刘建刚,
> >> >我使用了stop with savepoint,但是还是发现,下游有重复数据。
> >> >停止命令:
> >> >/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink stop \
> >> >-yid application_1625497885855_703064 \
> >> >-p
> >>
> >>
> >hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint
> >> >\
> >> >-d 55e7ebb6fa38faaba61b4b9a7cd89827
> >> >
> >> >重启命令:
> >> >/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \
> >> >-m yarn-cluster \
> >> >-yjm 4096 -ytm 4096 \
> >> >-ynm User_Click_Log_Split_All \
> >> >-yqu syh_offline \
> >> >-ys 2 \
> >> >-d \
> >> >-p 64 \
> >> >-s
> >>
> >>
> >hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-55e7eb-11203031f2a5
> >> >\
> >> >-n \
> >> >-c com.datacenter.etl.ods.common.mobile.UserClickLogAll \
> >>
> >>
> >/opt/case/app/realtime/v1.0/batch/buryingpoint/paiping/all/realtime_etl-1.0-SNAPSHOT.jar
> >> >
> >> >
> >> >刘建刚  于2021年8月2日周一 下午3:49写道:
> >> >
> >> >> cancel with savepoint是之前的接口了,会造成kafka数据的重复。新的stop with
> >> >> savepoint则会在做savepoint的时候,不再发送数据,从而避免了重复数据,哭啼可以参考
> >> >>
> >> >>
> >>
> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/savepoints/
> >> >>
> >> >> Jim Chen  于2021年8月2日周一 下午2:33写道:
> >> >>
> >> >> > 我是通过savepoint的方式重启的,命令如下:
> >> >> >
> >> >> > cancel command:
> >> >> >
> >> >> > /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink cancel \
> >> >> > -yid application_1625497885855_698371 \
> >> >> > -s
> >> >> >
> >> >> >
> >> >>
> >>
> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint
> >> >> > \
> >> >> > 59cf6ccc83aa163bd1e0cd3304dfe06a
> >> >> >
> >> >> > print savepoint:
> >> >> >
> >> >> >
> >> >> >
> >> >>
> >>
> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494
> >> >> >
> >> >> >
> >> >> > restart command:
> >> >> >
> >> >> > /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \
> >> >> > -m yarn-cluster \
> >> >> > -yjm 4096 -ytm 4096 \
> >> >> > -ynm User_Click_Log_Split_All \
> >> >> > -yqu syh_offline \
> >> >> > -ys 2 \
> >> >> > -d \
> >> >> > -p 64 \
> >> >> > -s
> >> >> >
> >> >> >
> >> >>
> >>
> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494
> >> >> > \
> >> >> > -n \
> >> >> > -c com.datacenter.etl.ods.common.mobile.UserClickLogAll \
> >> >> >
> >> >> >
> >> >>
> >>
> /opt/case/app/realtime/v1.0/batch/buryingpoint/paiping/all/realtime_etl-1.0-SNAPSHOT.jar
> >> >> >
> >> >> > Jim Chen  于2021年8月2日周一 下午2:01写道:
> >> >> >
> >> >> > > 大家好,我有一个flink job, 消费kafka topic A, 然后写到kafka  topic B.
> >> >> > > 当我通过savepoint的方式,重启任务之后,发现topic B中有重复消费的数据。有人可以帮我解答一下吗?谢谢!
> >> >> > >
> >> >> > > My Versions
> >> >> > > Flink 1.12.4
> >> >> > > Kafka 2.0.1
> >> >> 

Re: Re: 通过savepoint重启任务,link消费kafka,有重复消息

2021-08-02 Thread Jim Chen
我不太懂,下游的isolation.level是不是read_committed是啥意思。
我是把topic A中的partitionId和offset写到消息体中,然后flink程序,把消息写到下游的topic B中。将topic
B实时写到hive上,然后在hive表中,根据partitionId和offset去重,发现有重复消费了

东东  于2021年8月2日周一 下午6:20写道:

> 下游如何发现重复数据的,下游的isolation.level是不是read_committed
>
>
> 在 2021-08-02 18:14:27,"Jim Chen"  写道:
> >Hi 刘建刚,
> >我使用了stop with savepoint,但是还是发现,下游有重复数据。
> >停止命令:
> >/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink stop \
> >-yid application_1625497885855_703064 \
> >-p
>
> >hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint
> >\
> >-d 55e7ebb6fa38faaba61b4b9a7cd89827
> >
> >重启命令:
> >/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \
> >-m yarn-cluster \
> >-yjm 4096 -ytm 4096 \
> >-ynm User_Click_Log_Split_All \
> >-yqu syh_offline \
> >-ys 2 \
> >-d \
> >-p 64 \
> >-s
>
> >hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-55e7eb-11203031f2a5
> >\
> >-n \
> >-c com.datacenter.etl.ods.common.mobile.UserClickLogAll \
>
> >/opt/case/app/realtime/v1.0/batch/buryingpoint/paiping/all/realtime_etl-1.0-SNAPSHOT.jar
> >
> >
> >刘建刚  于2021年8月2日周一 下午3:49写道:
> >
> >> cancel with savepoint是之前的接口了,会造成kafka数据的重复。新的stop with
> >> savepoint则会在做savepoint的时候,不再发送数据,从而避免了重复数据,哭啼可以参考
> >>
> >>
> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/savepoints/
> >>
> >> Jim Chen  于2021年8月2日周一 下午2:33写道:
> >>
> >> > 我是通过savepoint的方式重启的,命令如下:
> >> >
> >> > cancel command:
> >> >
> >> > /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink cancel \
> >> > -yid application_1625497885855_698371 \
> >> > -s
> >> >
> >> >
> >>
> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint
> >> > \
> >> > 59cf6ccc83aa163bd1e0cd3304dfe06a
> >> >
> >> > print savepoint:
> >> >
> >> >
> >> >
> >>
> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494
> >> >
> >> >
> >> > restart command:
> >> >
> >> > /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \
> >> > -m yarn-cluster \
> >> > -yjm 4096 -ytm 4096 \
> >> > -ynm User_Click_Log_Split_All \
> >> > -yqu syh_offline \
> >> > -ys 2 \
> >> > -d \
> >> > -p 64 \
> >> > -s
> >> >
> >> >
> >>
> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494
> >> > \
> >> > -n \
> >> > -c com.datacenter.etl.ods.common.mobile.UserClickLogAll \
> >> >
> >> >
> >>
> /opt/case/app/realtime/v1.0/batch/buryingpoint/paiping/all/realtime_etl-1.0-SNAPSHOT.jar
> >> >
> >> > Jim Chen  于2021年8月2日周一 下午2:01写道:
> >> >
> >> > > 大家好,我有一个flink job, 消费kafka topic A, 然后写到kafka  topic B.
> >> > > 当我通过savepoint的方式,重启任务之后,发现topic B中有重复消费的数据。有人可以帮我解答一下吗?谢谢!
> >> > >
> >> > > My Versions
> >> > > Flink 1.12.4
> >> > > Kafka 2.0.1
> >> > > Java 1.8
> >> > >
> >> > > Core code:
> >> > >
> >> > > env.enableCheckpointing(30);
> >> > >
> >> > >
> >> >
> >>
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> >> > >
> >> > >
> >> >
> >>
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> >> > >
> >> > > DataStream dataDS = env.addSource(kafkaConsumer).map(xxx);
> >> > >
> >> > > tableEnv.createTemporaryView("data_table",dataDS);
> >> > > String sql = "select * from data_table a inner join
> >> > > hive_catalog.dim.dim.project for system_time as of a.proctime as b
> on
> >> > a.id
> >> > > = b.id"
> >> > > Table table = tableEnv.sqlQuery(sql);
> >> > > DataStream resultDS = tableEnv.toAppendStream(table,
> >> Row.class).map(xx);
> >> > >
> >> > > // Kafka producer parameter
> >> > > Properties producerProps = new Properties();
> >> > > producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
> >> > > bootstrapServers);
> >> > > producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
> >> > > producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG,
> >> > kafkaBufferMemory);
> >> > > producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaBatchSize);
> >> > > producerProps.put(ProducerConfig.LINGER_MS_CONFIG, kafkaLingerMs);
> >> > > producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,
> 30);
> >> > >
> producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
> >> > > "1");
> >> > > producerProps.put(ProducerConfig.RETRIES_CONFIG, "5");
> >> > > producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
> >> > > producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
> >> > >
> >> > > resultDS.addSink(new FlinkKafkaProducer(sinkTopic, new
> >> > > JSONSchema(), producerProps, new FlinkFixedPartitioner<>(),
> >> > > FlinkKafkaProducer.Semantic.EXACTLY_ONCE, 5))
> >> > > .setParallelism(sinkParallelism);
> >> > >
> >> >
> >>
>


Re: 通过savepoint重启任务,link消费kafka,有重复消息

2021-08-02 Thread Jim Chen
Hi 刘建刚,
我使用了stop with savepoint,但是还是发现,下游有重复数据。
停止命令:
/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink stop \
-yid application_1625497885855_703064 \
-p
hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint
\
-d 55e7ebb6fa38faaba61b4b9a7cd89827

重启命令:
/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \
-m yarn-cluster \
-yjm 4096 -ytm 4096 \
-ynm User_Click_Log_Split_All \
-yqu syh_offline \
-ys 2 \
-d \
-p 64 \
-s
hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-55e7eb-11203031f2a5
\
-n \
-c com.datacenter.etl.ods.common.mobile.UserClickLogAll \
/opt/case/app/realtime/v1.0/batch/buryingpoint/paiping/all/realtime_etl-1.0-SNAPSHOT.jar


刘建刚  于2021年8月2日周一 下午3:49写道:

> cancel with savepoint是之前的接口了,会造成kafka数据的重复。新的stop with
> savepoint则会在做savepoint的时候,不再发送数据,从而避免了重复数据,哭啼可以参考
>
> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/savepoints/
>
> Jim Chen  于2021年8月2日周一 下午2:33写道:
>
> > 我是通过savepoint的方式重启的,命令如下:
> >
> > cancel command:
> >
> > /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink cancel \
> > -yid application_1625497885855_698371 \
> > -s
> >
> >
> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint
> > \
> > 59cf6ccc83aa163bd1e0cd3304dfe06a
> >
> > print savepoint:
> >
> >
> >
> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494
> >
> >
> > restart command:
> >
> > /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \
> > -m yarn-cluster \
> > -yjm 4096 -ytm 4096 \
> > -ynm User_Click_Log_Split_All \
> > -yqu syh_offline \
> > -ys 2 \
> > -d \
> > -p 64 \
> > -s
> >
> >
> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494
> > \
> > -n \
> > -c com.datacenter.etl.ods.common.mobile.UserClickLogAll \
> >
> >
> /opt/case/app/realtime/v1.0/batch/buryingpoint/paiping/all/realtime_etl-1.0-SNAPSHOT.jar
> >
> > Jim Chen  于2021年8月2日周一 下午2:01写道:
> >
> > > 大家好,我有一个flink job, 消费kafka topic A, 然后写到kafka  topic B.
> > > 当我通过savepoint的方式,重启任务之后,发现topic B中有重复消费的数据。有人可以帮我解答一下吗?谢谢!
> > >
> > > My Versions
> > > Flink 1.12.4
> > > Kafka 2.0.1
> > > Java 1.8
> > >
> > > Core code:
> > >
> > > env.enableCheckpointing(30);
> > >
> > >
> >
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> > >
> > >
> >
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> > >
> > > DataStream dataDS = env.addSource(kafkaConsumer).map(xxx);
> > >
> > > tableEnv.createTemporaryView("data_table",dataDS);
> > > String sql = "select * from data_table a inner join
> > > hive_catalog.dim.dim.project for system_time as of a.proctime as b on
> > a.id
> > > = b.id"
> > > Table table = tableEnv.sqlQuery(sql);
> > > DataStream resultDS = tableEnv.toAppendStream(table,
> Row.class).map(xx);
> > >
> > > // Kafka producer parameter
> > > Properties producerProps = new Properties();
> > > producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
> > > bootstrapServers);
> > > producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
> > > producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG,
> > kafkaBufferMemory);
> > > producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaBatchSize);
> > > producerProps.put(ProducerConfig.LINGER_MS_CONFIG, kafkaLingerMs);
> > > producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 30);
> > > producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
> > > "1");
> > > producerProps.put(ProducerConfig.RETRIES_CONFIG, "5");
> > > producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
> > > producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
> > >
> > > resultDS.addSink(new FlinkKafkaProducer(sinkTopic, new
> > > JSONSchema(), producerProps, new FlinkFixedPartitioner<>(),
> > > FlinkKafkaProducer.Semantic.EXACTLY_ONCE, 5))
> > > .setParallelism(sinkParallelism);
> > >
> >
>


Re: Any one can help me? Flink consume Kafka duplicate message via savepoint restart

2021-08-02 Thread Jim Chen
I restart flink job via savepoint. command as following:

cancel command:

/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink cancel \
-yid application_1625497885855_698371 \
-s
hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint
\
59cf6ccc83aa163bd1e0cd3304dfe06a

print savepoint:

hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494


restart command:

/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \
-m yarn-cluster \
-yjm 4096 -ytm 4096 \
-ynm User_Click_Log_Split_All \
-yqu syh_offline \
-ys 2 \
-d \
-p 64 \
-s
hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494
\
-n \
-c com.datacenter.etl.ods.common.mobile.UserClickLogAll \
/opt/case/app/realtime/v1.0/batch/buryingpoint/paiping/all/realtime_etl-1.0-SNAPSHOT.jar

Jim Chen  于2021年8月2日周一 下午1:51写道:

> Hi all, my flink job consume kafka topic A, and write to kafka topic B.
> When i restart my flink job via savepoint, topic B have some duplicate
> message. Any one can help me how to solve this problem? Thanks!
>
> My Versions:
> Flink 1.12.4
> Kafka 2.0.1
> Java 1.8
>
> Core code:
> env.enableCheckpointing(30);
>
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>
> DataStream dataDS = env.addSource(kafkaConsumer).map(xxx);
>
> tableEnv.createTemporaryView("data_table",dataDS);
> String sql = "select * from data_table a inner join
> hive_catalog.dim.dim.project for system_time as of a.proctime as b on a.id
> = b.id"
> Table table = tableEnv.sqlQuery(sql);
> DataStream resultDS = tableEnv.toAppendStream(table, Row.class).map(xx);
>
> // Kafka producer parameter
> Properties producerProps = new Properties();
> producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
> bootstrapServers);
> producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
> producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, kafkaBufferMemory);
> producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaBatchSize);
> producerProps.put(ProducerConfig.LINGER_MS_CONFIG, kafkaLingerMs);
> producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 30);
> producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
> "1");
> producerProps.put(ProducerConfig.RETRIES_CONFIG, "5");
> producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
> producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
>
> resultDS.addSink(new FlinkKafkaProducer(sinkTopic, new
> JSONSchema(), producerProps, new FlinkFixedPartitioner<>(),
> FlinkKafkaProducer.Semantic.EXACTLY_ONCE, 5))
> .setParallelism(sinkParallelism);
>


Re: 通过savepoint重启任务,link消费kafka,有重复消息

2021-08-02 Thread Jim Chen
我是通过savepoint的方式重启的,命令如下:

cancel command:

/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink cancel \
-yid application_1625497885855_698371 \
-s
hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint
\
59cf6ccc83aa163bd1e0cd3304dfe06a

print savepoint:

hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494


restart command:

/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \
-m yarn-cluster \
-yjm 4096 -ytm 4096 \
-ynm User_Click_Log_Split_All \
-yqu syh_offline \
-ys 2 \
-d \
-p 64 \
-s
hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494
\
-n \
-c com.datacenter.etl.ods.common.mobile.UserClickLogAll \
/opt/case/app/realtime/v1.0/batch/buryingpoint/paiping/all/realtime_etl-1.0-SNAPSHOT.jar

Jim Chen  于2021年8月2日周一 下午2:01写道:

> 大家好,我有一个flink job, 消费kafka topic A, 然后写到kafka  topic B.
> 当我通过savepoint的方式,重启任务之后,发现topic B中有重复消费的数据。有人可以帮我解答一下吗?谢谢!
>
> My Versions
> Flink 1.12.4
> Kafka 2.0.1
> Java 1.8
>
> Core code:
>
> env.enableCheckpointing(30);
>
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>
> DataStream dataDS = env.addSource(kafkaConsumer).map(xxx);
>
> tableEnv.createTemporaryView("data_table",dataDS);
> String sql = "select * from data_table a inner join
> hive_catalog.dim.dim.project for system_time as of a.proctime as b on a.id
> = b.id"
> Table table = tableEnv.sqlQuery(sql);
> DataStream resultDS = tableEnv.toAppendStream(table, Row.class).map(xx);
>
> // Kafka producer parameter
> Properties producerProps = new Properties();
> producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
> bootstrapServers);
> producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
> producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, kafkaBufferMemory);
> producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaBatchSize);
> producerProps.put(ProducerConfig.LINGER_MS_CONFIG, kafkaLingerMs);
> producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 30);
> producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
> "1");
> producerProps.put(ProducerConfig.RETRIES_CONFIG, "5");
> producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
> producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
>
> resultDS.addSink(new FlinkKafkaProducer(sinkTopic, new
> JSONSchema(), producerProps, new FlinkFixedPartitioner<>(),
> FlinkKafkaProducer.Semantic.EXACTLY_ONCE, 5))
> .setParallelism(sinkParallelism);
>


通过savepoint重启任务,link消费kafka,有重复消息

2021-08-02 Thread Jim Chen
大家好,我有一个flink job, 消费kafka topic A, 然后写到kafka  topic B.
当我通过savepoint的方式,重启任务之后,发现topic B中有重复消费的数据。有人可以帮我解答一下吗?谢谢!

My Versions
Flink 1.12.4
Kafka 2.0.1
Java 1.8

Core code:

env.enableCheckpointing(30);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

DataStream dataDS = env.addSource(kafkaConsumer).map(xxx);

tableEnv.createTemporaryView("data_table",dataDS);
String sql = "select * from data_table a inner join
hive_catalog.dim.dim.project for system_time as of a.proctime as b on a.id
= b.id"
Table table = tableEnv.sqlQuery(sql);
DataStream resultDS = tableEnv.toAppendStream(table, Row.class).map(xx);

// Kafka producer parameter
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, kafkaBufferMemory);
producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaBatchSize);
producerProps.put(ProducerConfig.LINGER_MS_CONFIG, kafkaLingerMs);
producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 30);
producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
"1");
producerProps.put(ProducerConfig.RETRIES_CONFIG, "5");
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");

resultDS.addSink(new FlinkKafkaProducer(sinkTopic, new
JSONSchema(), producerProps, new FlinkFixedPartitioner<>(),
FlinkKafkaProducer.Semantic.EXACTLY_ONCE, 5))
.setParallelism(sinkParallelism);


Any one can help me? Flink consume Kafka duplicate message via savepoint restart

2021-08-01 Thread Jim Chen
Hi all, my flink job consume kafka topic A, and write to kafka topic B.
When i restart my flink job via savepoint, topic B have some duplicate
message. Any one can help me how to solve this problem? Thanks!

My Versions:
Flink 1.12.4
Kafka 2.0.1
Java 1.8

Core code:
env.enableCheckpointing(30);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

DataStream dataDS = env.addSource(kafkaConsumer).map(xxx);

tableEnv.createTemporaryView("data_table",dataDS);
String sql = "select * from data_table a inner join
hive_catalog.dim.dim.project for system_time as of a.proctime as b on a.id
= b.id"
Table table = tableEnv.sqlQuery(sql);
DataStream resultDS = tableEnv.toAppendStream(table, Row.class).map(xx);

// Kafka producer parameter
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, kafkaBufferMemory);
producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaBatchSize);
producerProps.put(ProducerConfig.LINGER_MS_CONFIG, kafkaLingerMs);
producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 30);
producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
"1");
producerProps.put(ProducerConfig.RETRIES_CONFIG, "5");
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");

resultDS.addSink(new FlinkKafkaProducer(sinkTopic, new
JSONSchema(), producerProps, new FlinkFixedPartitioner<>(),
FlinkKafkaProducer.Semantic.EXACTLY_ONCE, 5))
.setParallelism(sinkParallelism);


Re: 大佬们,有遇到Flink cdc 同步MySQL中的数据,MySQL中的数据有变化,Flink SQL中的表没有实时同步到变化,是什么原因呢?

2020-10-08 Thread Jim Chen
能不能再具体一点,什么类型的表,做了什么样的操作???

chegg_work  于2020年10月2日周五 下午8:14写道:

>
>


Flink动态CEP该怎么做?

2020-09-03 Thread Jim Chen
Hi,
我们打算用flink来做规则匹配,现在打算用CEP来做。但是发现flink
不支持动态CEP,网上百度了下,类似于滴滴那种方式,改动太大。没有能力能做,所以,问下大家,有没有什么思路,简单点的


flink1.10集成PrometheusPushGateway,日志报错Failed to push metrics to PushGateway with jobName

2020-09-02 Thread Jim Chen
Hi:
我的环境是flink1.10.1,是基于yarn的per
job模式运行的。现在集成了PrometheusPushGateway,但是日志中,一直提示Failed to push metrics to
PushGateway with jobName。
具体报错日志为:
2020-09-02 15:11:21.901 application_1598509186865_0129 172.22.64.72
wx11-dsj-flink004 [Flink-MetricRegistry-thread-1] WARN o.a.f.m.p.
PrometheusPushGatewayReporter - Failed to push metrics to PushGateway with
jobName realtime66b27e0fdfaa3860997abeb0170d84bb, groupingKey {}.
java.io.IOException: Response code from
http://172.16.24.146:9091/metrics/job/realtime66b27e0fdfaa3860997abeb0170d84bb
was 200
at org.apache.flink.shaded.io.prometheus.client.exporter.PushGateway
.doRequest(PushGateway.java:297)
at org.apache.flink.shaded.io.prometheus.client.exporter.PushGateway
.push(PushGateway.java:127)
at org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
.report(PrometheusPushGatewayReporter.java:109)
at org.apache.flink.runtime.metrics.MetricRegistryImpl$ReporterTask.run(
MetricRegistryImpl.java:441)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:
511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask
.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask
.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor
.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor
.java:624)
at java.lang.Thread.run(Thread.java:748)


Re: 回复:flink1.9.1采用Prometheus Pushgateway监控,当任务被kill掉,但grafana还是可以监控到数据

2020-09-01 Thread Jim Chen
能不能过restful api拿到pushgateway中metric上报的时间?通过这个时间动态判断删除group

xiao cai  于2020年9月1日周二 下午8:52写道:

> Hi shizk233:
> 我这边也复现了你说的情况,一模一样。
>
>  
> 可以尝试使用定时调度任务检查flink任务的执行情况,当不再处于运行状态时,主动调用pushgateway的delete方法来删除pushgetway的metrics。
>
>
>
>
>  原始邮件
> 发件人: shizk233
> 收件人: user-zh@flink.apache.org
> 发送时间: 2020年9月1日(周二) 19:10
> 主题: Re: 回复:flink1.9.1采用Prometheus Pushgateway监控,当任务被kill掉,但grafana还是可以监控到数据
>
>
> Hi Xiao, 我这边实践过程中发现,该参数只能删除jobmanager对应的metrics group,不能删除tm的。
> 我们开启了randomJobNameSuffix,该参数会让JM和TM的metrics信息分属不同metrics group。
> 感觉这可能是一个bug? xiao cai  于2020年9月1日周二 下午4:57写道: > Hi: >
> 可以试试在flink-conf.yaml中添加: > metrics.reporter.promgateway.deleteOnShutdown:
> true > > > Best, > Xiao > 原始邮件 > 发件人: bradyMk > 收件人:
> user-zh > 发送时间: 2020年9月1日(周二) 16:50 > 主题: Re:
> 回复:flink1.9.1采用Prometheus Pushgateway监控,当任务被kill掉,但grafana还是可以监控到数据 > > >
> 您好,我不是很懂您的意思,例如我这边有一个这样的指标:flink_jobmanager_job_uptime 监控一个任务的运行时长; >
> 如果该任务被kill掉,那么这个指标的数值会变成一个不变的量,一直显示在grafana中。我不太会promeQL,我尝试这样: >
> flink_jobmanager_job_uptime[1m],这样是个非法查询命令,按照您的意思,应该怎么改呢? - Best Wishes
> > -- Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 关于flink任务的日志收集到kafka,可以在logback配置文件中,加如每个job的id或者name吗?

2020-08-30 Thread Jim Chen
我也是flink1.10.1的版本的,如果按照你的方法,每次启动一个任务,都要在flink-conf.yaml中修改一下`env.java.opts:
-Djob.name=xxx`吗?这样的话,是不是太麻烦了

zilong xiao  于2020年8月31日周一 下午12:08写道:

> 想问下你用的flink哪个版本呢?
> 如果是Flink 1.10-版本,可以在shell脚本中加上 -yD
> jobName=xxx,然后在logback自定义PatternLayout中用环境变量`_DYNAMIC_PROPERTIES`获取
> 如果是Flink 1.10+版本,则上述方式不可行,因为1.10+版本在作业启动执行 launch_container.sh
> <
> http://dn-rt199.jja.bigo:8042/node/containerlogs/container_e19_1597907464753_1954_01_01/zengkejie/launch_container.sh/?start=-4096
> >脚本时,脚本中不再`export
>  _DYNAMIC_PROPERTIES`变量,所以无法从环境变量获取,那么可以在flink-conf.yaml中添加
> `env.java.opts: -Djob.name=xxx`,然后在 PatternLayout中获取启动参数即可
>
> 以上是我个人的实现方式,目前可正常运行,如有描述不正确的地方,欢迎探讨~
>
> Jim Chen  于2020年8月31日周一 上午11:33写道:
>
> > 我现在是用shell脚本提交per job模式的任务,现在只能拿到yarn的applicationId,自定义的任务名,拿不到
> >
> >
> > zilong xiao  于2020年8月27日周四 下午7:24写道:
> >
> > > 如果是用CLI方式提交作业的话是可以做到的
> > >
> > > Jim Chen  于2020年8月27日周四 下午6:13写道:
> > >
> > > > 如果是自动以PatternLayout的话,我有几点疑问:
> > > >
> > > >
> > >
> >
> 1、logback加载时机的问题,就是①先会运行logback相关类,②再执行你自定义的PatternLayout,③再去执行你的主类,在②的时候,此时还没法确定具体的启动类是啥,这种方式没法根据job动态变化
> > > >
> > > > 如果使用env的话
> > > > 1、配置环境变量的话,如果yarn有10个节点。那么每台是不是都要配置一下
> > > > 2、因为是每个job都要传递,所以,这个应该是临时的环境变量吧
> > > > 3、如果是配置的临时环境变量的话,那么在执行bin/flink run的时候,shell中是执行java
> > > >
> > -cp的,此时的主类,是org.apache.flink.client.cli.CliFrontend,这种方式,环境变量在传递的时候,会丢吧?
> > > >
> > > > zilong xiao  于2020年8月25日周二 下午5:32写道:
> > > >
> > > > >
> > >
> 1:想加入跟业务相关的字段,例如jobId,jobName,可以继承PatternLayout,重写doLayout即可,在方法中对日志进行填充
> > > > > 2:这些属性有办法可以从环境变量中获取
> > > > >
> > > > > Jim Chen  于2020年8月25日周二 下午4:49写道:
> > > > >
> > > > > > 大家好:
> > > > > >
>  我们在做flink的日志收集到kafak时,使用的logback日志配置文件,目前的pattern是%d{-MM-dd
> > > > > > HH:mm:ss.SSS} [%thread] %-5level %logger{60} -
> > > > > >
> > %msg,有没有什么办法在里面加入每个job的id,name或者tasknamanger的主机名之类的信息啊。在做ELK的时候,方便查询。
> > > > > > 这个配置文件,是整个项目的,是基于Yarn的per
> > job模式,难道每个主类打包的时候,都要改动不同的logbakc配置文件吗?
> > > > > >
> > > > >
> > > >
> > >
> >
>


Re: 关于flink任务的日志收集到kafka,可以在logback配置文件中,加如每个job的id或者name吗?

2020-08-30 Thread Jim Chen
我现在是用shell脚本提交per job模式的任务,现在只能拿到yarn的applicationId,自定义的任务名,拿不到


zilong xiao  于2020年8月27日周四 下午7:24写道:

> 如果是用CLI方式提交作业的话是可以做到的
>
> Jim Chen  于2020年8月27日周四 下午6:13写道:
>
> > 如果是自动以PatternLayout的话,我有几点疑问:
> >
> >
> 1、logback加载时机的问题,就是①先会运行logback相关类,②再执行你自定义的PatternLayout,③再去执行你的主类,在②的时候,此时还没法确定具体的启动类是啥,这种方式没法根据job动态变化
> >
> > 如果使用env的话
> > 1、配置环境变量的话,如果yarn有10个节点。那么每台是不是都要配置一下
> > 2、因为是每个job都要传递,所以,这个应该是临时的环境变量吧
> > 3、如果是配置的临时环境变量的话,那么在执行bin/flink run的时候,shell中是执行java
> > -cp的,此时的主类,是org.apache.flink.client.cli.CliFrontend,这种方式,环境变量在传递的时候,会丢吧?
> >
> > zilong xiao  于2020年8月25日周二 下午5:32写道:
> >
> > >
> 1:想加入跟业务相关的字段,例如jobId,jobName,可以继承PatternLayout,重写doLayout即可,在方法中对日志进行填充
> > > 2:这些属性有办法可以从环境变量中获取
> > >
> > > Jim Chen  于2020年8月25日周二 下午4:49写道:
> > >
> > > > 大家好:
> > > > 我们在做flink的日志收集到kafak时,使用的logback日志配置文件,目前的pattern是%d{-MM-dd
> > > > HH:mm:ss.SSS} [%thread] %-5level %logger{60} -
> > > > %msg,有没有什么办法在里面加入每个job的id,name或者tasknamanger的主机名之类的信息啊。在做ELK的时候,方便查询。
> > > > 这个配置文件,是整个项目的,是基于Yarn的per job模式,难道每个主类打包的时候,都要改动不同的logbakc配置文件吗?
> > > >
> > >
> >
>


Re: flink on yarn日志问题

2020-08-27 Thread Jim Chen
能分享一下demo吗?

Cayden chen <1193216...@qq.com> 于2020年7月15日周三 下午2:56写道:

> 我们的获取逻辑是通过定义
> logback的appder,appder通过解析当前系统路径(因为flink每个taskmanager会自己定义一个带有applicationId的路径,然后里面会放各种jar包,包括我自定义的appder),获取之后通过MDC.put(),给日志加一列appId,在appder里面把日志上报到外部的日志系统
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> rjia...@163.com;
> 发送时间:2020年7月14日(星期二) 下午5:31
> 收件人:"user-zh@flink.apache.org"
> 主题:回复:   flink on yarn日志问题
>
>
>
>
>
> 我们获取运行Yarn日志逻辑大概是这样的:
> 1.
> 访问rm-address/ws/v1/cluster/apps/applicationId,拿到amContainerLog中的url即为jm的url.
> 2.
> taskmanager日志url通过rm-address/proxy/applicationId/taskmanagers,拿到所有taskmanager的基本信息,替换amContainerLog中的containername和ip。
>
>
> 日志比较大时:指定读取的字节开始和结束位置,url?start=0end=1024
> | |
> jianxu
> |
> |
> rjia...@163.com
> |
> 在2020年07月14日 17:07,Cayden chen<1193216...@qq.com 写道:
> 有个问题,如何区分日志是哪个任务的呢
>
>
>
>
> --nbsp;原始邮件nbsp;--
> 发件人:
> "user-zh"
>  发送时间:nbsp;2020年7月14日(星期二) 下午5:05
> 收件人:nbsp;"user-zh"
> 主题:nbsp;Re: Re: Re: flink on yarn日志问题
>
>
>
> Flink在1.11开始默认使用log4j2, log4j2已经有了很多appender[1]可以用来将日志输出到外部系统或服务。
>
> [1] https://logging.apache.org/log4j/2.x/manual/appenders.html
>
> Best,
> Yangze Guo
>
> On Tue, Jul 14, 2020 at 4:46 PM nicygan  gt;
> gt; 是有这个毛病,看TM日志不方便。
> gt;
> gt; 而且本地日志过几小时就会被清理,时间一久就看不到了,只剩JM日志。
> gt;
> gt;
> gt;
> gt;
> gt;
> gt;
> gt; 在 2020-07-14 12:35:06,"zhisheng"  写道:
> gt; gt;知道 YARN 的 applicationId,应该也可以去 HDFS 找对应的 taskmanager
> 的日志(可以拼出路径),然后复制到本地去查看
> gt; gt;
> gt; gt;Yangze Guo  上午11:58写道:
> gt; gt;
> gt; gt;gt; Hi, 王松
> gt; gt;gt;
> gt; gt;gt; 我理解拼接url就可以了,不用实际去登陆机器然后进到对应目录。
> gt; gt;gt;
> gt; gt;gt; Best,
> gt; gt;gt; Yangze Guo
> gt; gt;gt;
> gt; gt;gt; On Tue, Jul 14, 2020 at 8:26 AM 王松 <
> sdlcwangson...@gmail.comgt; wrote:
> gt; gt;gt; gt;
> gt; gt;gt; gt; 我们也有问题 1,和 Yangze Guo
> 说的一样,每次都要去对应的tm目录中去找日志,很麻烦,不知道有没有更简单的办法。
> gt; gt;gt; gt;
> gt; gt;gt; gt; Yangze Guo  于2020年7月13日周一 下午5:03写道:
> gt; gt;gt; gt;
> gt; gt;gt; gt; gt; 1.
> gt; gt;gt; gt; gt;
> gt; gt;gt;
> 我验证了一下,如果开启了日志收集,那tm的日志是会保存的,但是你整个application结束前可能看不到,有一个trick的方法,首先在jm日志中找到tm分配到了哪个NodeManager上,通过拼接url的方式来获取container的日志
> gt; gt;gt; gt; gt; 2. 你是否需要调整一下重启策略[1]?
> 如果开启了ck,默认情况下就会一直尝试重启job
> gt; gt;gt; gt; gt;
> gt; gt;gt; gt; gt; [1]
> gt; gt;gt; gt; gt;
> gt; gt;gt;
> https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/task_failure_recovery.html
> gt; gt;gt; gt; gt;
> gt; gt;gt; gt; gt; Best,
> gt; gt;gt; gt; gt; Yangze Guo
> gt; gt;gt; gt; gt;
> gt; gt;gt; gt; gt;
> gt; gt;gt; gt; gt; On Mon, Jul 13, 2020 at 2:40
> PM 程龙 <13162790...@163.comgt; wrote:
> gt; gt;gt; gt; gt; gt;
> gt; gt;gt; gt; gt; gt; 不好意思nbsp;
> 怪我灭有描述清楚
> gt; gt;gt; gt; gt; gt; 1 目前开启日志收集功能
> gt; gt;gt; gt; gt; gt; 2 目前已是 per-job模式
> gt; gt;gt; gt; gt; gt; 3 集群使用cdh flink.1.10
> gt; gt;gt; gt; gt; gt;
> gt; gt;gt; gt; gt; gt;
> gt; gt;gt; gt; gt; gt;
> gt; gt;gt; gt; gt; gt;
> gt; gt;gt; gt; gt; gt;
> gt; gt;gt; gt; gt; gt;
> gt; gt;gt; gt; gt; gt;
> gt; gt;gt; gt; gt; gt;
> gt; gt;gt; gt; gt; gt;
> gt; gt;gt; gt; gt; gt;
> gt; gt;gt; gt; gt; gt;
> gt; gt;gt; gt; gt; gt;
> gt; gt;gt; gt; gt; gt;
> gt; gt;gt; gt; gt; gt;
> gt; gt;gt; gt; gt; gt; 在 2020-07-13
> 11:18:46,"Yangze Guo"  gt; gt;gt; gt; gt; gt; gt;Hi,
> gt; gt;gt; gt; gt; gt; gt;
> gt; gt;gt; gt; gt; gt;
> gt;第一个问题,您可以尝试开启Yarn的日志收集功能[1]
> gt; gt;gt; gt; gt; gt; gt;
> gt; gt;gt; gt; gt; gt;
> gt;第二个问题,您可以尝试一下per-job mode [2][3]
> gt; gt;gt; gt; gt; gt; gt;
> gt; gt;gt; gt; gt; gt; gt;[1]
> gt; gt;gt; gt; gt;
> gt; gt;gt;
> https://ci.apache.org/projects/flink/flink-docs-master/zh/ops/deployment/yarn_setup.html#log-files
> gt
> ;
> gt;gt; gt; gt; gt; gt;[2]
> gt; gt;gt; gt; gt;
> gt; gt;gt;
> https://ci.apache.org/projects/flink/flink-docs-master/zh/ops/deployment/#per-job-mode
> gt
> ;
> gt;gt; gt; gt; gt; gt;[3]
> gt; gt;gt; gt; gt;
> gt; gt;gt;
> https://ci.apache.org/projects/flink/flink-docs-master/zh/ops/deployment/yarn_setup.html#run-a-single-flink-job-on-yarn
> gt
> ;
> gt;gt; gt; gt; gt; gt;
> gt; gt;gt; gt; gt; gt; gt;
> gt; gt;gt; gt; gt; gt; gt;Best,
> gt; gt;gt; gt; gt; gt; gt;Yangze Guo
> gt; gt;gt; gt; gt; gt; gt;
> gt; gt;gt; gt; gt; gt; gt;On Mon, Jul
> 13, 2020 at 10:49 AM 程龙 <13162790...@163.comgt; wrote:
> gt; gt;gt; gt; gt; gt; gt;gt;
> gt; gt;gt; gt; gt; gt; gt;gt;
> 请问一下两个问题
> gt; gt;gt; gt; gt; gt; gt;gt; 1
> flink on yarn的时候 taskmanager 挂掉的时候 上面的日志会被删除掉 无法查看
> gt; gt;gt; gt; gt; ,除了使用es收集日志的这种方案,
> 还有没有可以使taskmanager 挂掉,相关日志仍然可以保留。
> gt; gt;gt; gt; gt; gt; gt;gt; 2
> flink on yarn模式 当由于错误导致taskmanager 挂掉,但是jobmanager 却一直存在,
> gt; gt;gt; gt; gt; 

Re: 关于flink任务的日志收集到kafka,可以在logback配置文件中,加如每个job的id或者name吗?

2020-08-27 Thread Jim Chen
如果是自动以PatternLayout的话,我有几点疑问:
1、logback加载时机的问题,就是①先会运行logback相关类,②再执行你自定义的PatternLayout,③再去执行你的主类,在②的时候,此时还没法确定具体的启动类是啥,这种方式没法根据job动态变化

如果使用env的话
1、配置环境变量的话,如果yarn有10个节点。那么每台是不是都要配置一下
2、因为是每个job都要传递,所以,这个应该是临时的环境变量吧
3、如果是配置的临时环境变量的话,那么在执行bin/flink run的时候,shell中是执行java
-cp的,此时的主类,是org.apache.flink.client.cli.CliFrontend,这种方式,环境变量在传递的时候,会丢吧?

zilong xiao  于2020年8月25日周二 下午5:32写道:

> 1:想加入跟业务相关的字段,例如jobId,jobName,可以继承PatternLayout,重写doLayout即可,在方法中对日志进行填充
> 2:这些属性有办法可以从环境变量中获取
>
> Jim Chen  于2020年8月25日周二 下午4:49写道:
>
> > 大家好:
> > 我们在做flink的日志收集到kafak时,使用的logback日志配置文件,目前的pattern是%d{-MM-dd
> > HH:mm:ss.SSS} [%thread] %-5level %logger{60} -
> > %msg,有没有什么办法在里面加入每个job的id,name或者tasknamanger的主机名之类的信息啊。在做ELK的时候,方便查询。
> > 这个配置文件,是整个项目的,是基于Yarn的per job模式,难道每个主类打包的时候,都要改动不同的logbakc配置文件吗?
> >
>


Re: 关于flink任务的日志收集到kafka,可以在logback配置文件中,加如每个job的id或者name吗?

2020-08-27 Thread Jim Chen
如果使用env的话,我有几点疑问:
1、配置环境变量的话,如果yarn有10个节点。那么每台是不是都要配置一下
2、因为是每个job都要传递,所以,这个应该是临时的环境变量吧
3、如果是配置的临时环境变量的话,那么在执行bin/flink run的时候,shell中是执行java
-cp的,此时的主类,是org.apache.flink.client.cli.CliFrontend,这种方式,环境变量在传递的时候,会丢吧?

Yang Wang  于2020年8月26日周三 上午10:17写道:

> 我建议可以通env的方式传,在logback或者log4j配置中直接引用相应的env
>
> 例如,可以通过如下两个配置传递clusterId到环境变量
> containerized.master.env.clusterId=my-flink-cluster
> containerized.taskmanager.env.clusterId=my-flink-cluster
>
> 另外,也有一些内置的环境变量可以来使用
> _FLINK_CONTAINER_ID
> _FLINK_NODE_ID
>
>
> Best,
> Yang
>
> zilong xiao  于2020年8月25日周二 下午5:32写道:
>
> > 1:想加入跟业务相关的字段,例如jobId,jobName,可以继承PatternLayout,重写doLayout即可,在方法中对日志进行填充
> > 2:这些属性有办法可以从环境变量中获取
> >
> > Jim Chen  于2020年8月25日周二 下午4:49写道:
> >
> > > 大家好:
> > > 我们在做flink的日志收集到kafak时,使用的logback日志配置文件,目前的pattern是%d{-MM-dd
> > > HH:mm:ss.SSS} [%thread] %-5level %logger{60} -
> > > %msg,有没有什么办法在里面加入每个job的id,name或者tasknamanger的主机名之类的信息啊。在做ELK的时候,方便查询。
> > > 这个配置文件,是整个项目的,是基于Yarn的per job模式,难道每个主类打包的时候,都要改动不同的logbakc配置文件吗?
> > >
> >
>


关于flink任务的日志收集到kafka,可以在logback配置文件中,加如每个job的id或者name吗?

2020-08-25 Thread Jim Chen
大家好:
我们在做flink的日志收集到kafak时,使用的logback日志配置文件,目前的pattern是%d{-MM-dd
HH:mm:ss.SSS} [%thread] %-5level %logger{60} -
%msg,有没有什么办法在里面加入每个job的id,name或者tasknamanger的主机名之类的信息啊。在做ELK的时候,方便查询。
这个配置文件,是整个项目的,是基于Yarn的per job模式,难道每个主类打包的时候,都要改动不同的logbakc配置文件吗?


Re: flink webui前端

2020-08-25 Thread Jim Chen
flinlk-runtime-web,这个module下

罗显宴 <15927482...@163.com> 于2020年8月25日周二 下午3:43写道:

> 大家好,请问flink的webui前端实现的源码在哪呀
>
>
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
> 签名由网易邮箱大师定制


Re: flink1.11 sql问题

2020-08-25 Thread Jim Chen
这个需要你自定义UDF

酷酷的浑蛋  于2020年8月25日周二 下午3:46写道:

> 关键是那个值不是固定的,有时候是json,有时候是json数组,没办法固定写一个,现在我只想把value当做字符串获取到,难道没有办法吗
>
>
>
>
> 在2020年08月25日 15:34,taochanglian 写道:
> flinksql,处理json ,对象的话用row,数组的话用array获取具体的值。
>
> 在 2020/8/25 14:59, 酷酷的浑蛋 写道:
> 还是这个问题,如果字段的值有时候是json有时候是json数组,那么我只想把它当做字符串显示,该怎么写?
>
>
>
>
> 在2020年08月25日 14:05,酷酷的浑蛋 写道:
> 我知道了
>
>
>
>
> 在2020年08月25日 13:58,酷酷的浑蛋 写道:
>
>
>
>
> flink1.11
> 读取json数据时format=“json”,当数据中某个字段的值是[{"a1":{"a2":"v2"}}]类似这种嵌套,flink取到的值就是空,这个怎么处理?
>


Re: flink sql在实时数仓中,关联hbase维表频繁变化的问题

2020-08-25 Thread Jim Chen
  我们的维表,大概10个亿左右。每秒大概有4万的请求,要去查询,所以mysql扛不住。 还有就是维表数据变化后,需要秒级进行更新和关联的

china_tao  于2020年8月18日周二 下午11:13写道:

> 个人觉得还是取舍的问题,我们现在用flink sql 做实时数仓,维度表暂时用mysql,与业务商定好更新事件后,配置flink sql
> jdbc的lookup.cache.ttl参数来设置刷新时间,不知道你项目中,是维表数据变更后,需要秒级关联到消息中?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink sql在实时数仓中,关联hbase维表频繁变化的问题

2020-08-25 Thread Jim Chen
这个不行的话

shizk233  于2020年8月17日周一 下午11:23写道:

> 有没有可能把维表数据也作为数据流从kafka输入呢
>
> Jim Chen  于2020年8月17日周一 下午4:36写道:
>
> > 大家好:
> > 我们现在在用flink
> sql在做实时数仓,架构大概是kafka关联hbase维表,然后写入clickhouse。hbase维表是频繁变化的
> > 现在遇到的几个比较棘手的问题:
> > 1、自己在实现AsyncTableFunction做异步io的时候,发现性能还是不够。后来就加入本地缓存,但是缓存一致性出现问题,不知道该如何解决
> > 2、写入hbase的时候,是批量写的,无法保证有序,维表频繁变化的话,顺序不对,会造成结果有问题
> > 3、hbase维表,可能5s后才会更新,但是此时kafka数据流已经过去了,关联的数据都是空
> >
> > 不知道,针对上面的场景,有什么好的解决思路或者方案
> >
>


Re: Re: 对于维表频繁更新,状态越来越大的场景如何保证数据的准确性

2020-08-17 Thread Jim Chen
hbase维表的数据量,大概500G

Dream-底限  于2020年8月13日周四 下午12:16写道:

> flink暴漏的lookup
>
> 是支持设置缓存记录条数和缓存时间的吧,把时间和条数设置的小一点或者直接禁用缓存,如果流表数据量不大的话可以不用异步访问,数据量大的话异步访问不加缓存维表存储引擎可能压力过大
>
> Jim Chen  于2020年8月13日周四 上午11:53写道:
>
> > 请问下,如果使用了localcache+asyncIO的方式,缓存一致性,有什么解决的思路吗?维表的状态是频繁更新的
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
>


flink sql在实时数仓中,关联hbase维表频繁变化的问题

2020-08-17 Thread Jim Chen
大家好:
我们现在在用flink sql在做实时数仓,架构大概是kafka关联hbase维表,然后写入clickhouse。hbase维表是频繁变化的
现在遇到的几个比较棘手的问题:
1、自己在实现AsyncTableFunction做异步io的时候,发现性能还是不够。后来就加入本地缓存,但是缓存一致性出现问题,不知道该如何解决
2、写入hbase的时候,是批量写的,无法保证有序,维表频繁变化的话,顺序不对,会造成结果有问题
3、hbase维表,可能5s后才会更新,但是此时kafka数据流已经过去了,关联的数据都是空

不知道,针对上面的场景,有什么好的解决思路或者方案


Re: Re: 对于维表频繁更新,状态越来越大的场景如何保证数据的准确性

2020-08-12 Thread Jim Chen
请问下,如果使用了localcache+asyncIO的方式,缓存一致性,有什么解决的思路吗?维表的状态是频繁更新的



--
Sent from: http://apache-flink.147419.n8.nabble.com/

HELP!!! Flink1.10 sql insert into HBase, error like validateSchemaAndApplyImplicitCast

2020-07-15 Thread Jim Chen
Hi,
  I use flink1.10.1 sql to connect hbase1.4.3. When inset into hbase,
report an error like validateSchemaAndApplyImplicitCast. Means that the
Query Schema and Sink Schema are inconsistent.
  Mainly Row (EXPR$0) in Query Schema, which are all expressions. Sink
Schema is Row(device_id). I don't know how to write in sql to be consistent
with hbase's sink schema.
  I try to write sql like select device_id as rowkey, ROW( device_id as
[cannot write as]  ) as f1

error message as follow:
[image: image.png]

sample code like:
HBase sink ddl:
String ddlSource = "CREATE TABLE
test_hive_catalog.test_dim.dw_common_mobile_device_user_mapping (\n" +
"  rowkey STRING,\n" +
"  f1 ROW< \n" +
"device_id STRING,\n" +
"pass_id STRING,\n" +
"first_date STRING,\n" +
"first_channel_id STRING,\n" +
"first_app_version STRING,\n" +
"first_server_time STRING,\n" +
"first_server_hour STRING,\n" +
"first_ip_location STRING,\n" +
"first_login_time STRING,\n" +
"sys_can_uninstall STRING,\n" +
"update_date STRING,\n" +
"server_time BIGINT,\n" +
"last_pass_id STRING,\n" +
"last_channel_id STRING,\n" +
"last_app_version STRING,\n" +
"last_date STRING,\n" +
"os STRING,\n" +
"attribution_channel_id STRING,\n" +
"attribution_first_date STRING,\n" +
"p_product STRING,\n" +
"p_project STRING,\n" +
"p_dt STRING\n" +
">\n" +
") WITH (\n" +
"  'connector.type' = 'hbase',\n" +
"  'connector.version' = '1.4.3',\n" + //
即使绕过语法编译,换其他版本的hbase,还是有问题,如线上的版本就不行
"  'connector.table-name' =
'dw_common_mobile_device_user_mapping_new',\n" +
"  'connector.zookeeper.quorum' = '"+ zookeeperServers
+"',\n" +
"  'connector.zookeeper.znode.parent' = '/hbase143',\n" +
"  'connector.write.buffer-flush.max-size' = '2mb',\n" +
"  'connector.write.buffer-flush.max-rows' = '1000',\n" +
"  'connector.write.buffer-flush.interval' = '2s'\n" +
")";

insert into sql:

String bodyAndLocalSql = "" +
//"insert into
test_hive_catalog.test_dim.dw_common_mobile_device_user_mapping " +
"SELECT CAST(rowkey AS STRING) AS rowkey, " +
" ROW(" +
" device_id, pass_id, first_date, first_channel_id,
first_app_version, first_server_time, first_server_hour, first_ip_location,
first_login_time, sys_can_uninstall, update_date, server_time,
last_pass_id, last_channel_id, last_app_version, last_date, os,
attribution_channel_id, attribution_first_date, p_product, p_project, p_dt
" +
") AS f1" +
" FROM " +
"(" +
" SELECT " +
" MD5(CONCAT_WS('|', kafka.uid, kafka.p_product,
kafka.p_project)) AS rowkey, " +
" kafka.uid AS device_id " +
",kafka.pass_id " +

// first_date
",CASE WHEN COALESCE(hbase.server_time, 0) <=
kafka.server_time " +
// 新用户
" THEN FROM_UNIXTIME(kafka.server_time, '-MM-dd') " +
// 老用户
" ELSE hbase.first_date END AS first_date " +

// first_channel_id
",CASE WHEN COALESCE(hbase.server_time, 0) <=
kafka.server_time " +
// 新用户
" THEN kafka.wlb_channel_id" +
// 老用户
" ELSE hbase.first_channel_id END AS first_channel_id " +

// first_app_version
",CASE WHEN COALESCE(hbase.server_time, 0) <=
kafka.server_time " +
// 新用户
" THEN kafka.app_version " +
// 老用户
" ELSE hbase.first_app_version END AS first_app_version " +

// first_server_time
",CASE WHEN COALESCE(hbase.server_time, 0) <=
kafka.server_time " +
// 新用户
" THEN FROM_UNIXTIME(kafka.server_time, '-MM-dd
HH:mm:ss') " +
// 老用户
" ELSE hbase.first_server_time END AS first_server_time " +

// first_server_hour
",CASE WHEN COALESCE(hbase.server_time, 0) <=
kafka.server_time " +
// 新用户
" THEN FROM_UNIXTIME(kafka.server_time, 'HH') " +
// 老用户
" ELSE hbase.first_server_hour END AS first_server_hour " +

// first_ip_location

HELP,flink1.10 sql整合hbase,insert into时,提示validateSchemaAndApplyImplicitCast报错

2020-07-15 Thread Jim Chen
Hi,

我在使用flink1.10.1的sql功能,hbase的版本是1.4.3,写入hbase时,提示validateSchemaAndApplyImplicitCast报错,意思是Query的Schema和Sink的Schema不一致。主要是Query
Schema中的Row(EXPR$0),里面都是表达式。Sink
Schema中是Row(device_id)这种。我不知道,在sql中如何写,才能和hbase的sink schema保持一致。

我尝试了,类似于在select device_id as rowkey, ROW( device_id as 这里不能as ) as
f1,不写的话,Query 中ROW的 Schema都是表达式,不是具体定义的一个字段

这里query和sink的字段个数,是对上的。每个字段的类型也是对应上的。就是Query的Schema中是表达式,没法保持一致

报错信息如下:
[image: image.png]

关键代码:
HBase sink ddl:
String ddlSource = "CREATE TABLE
test_hive_catalog.test_dim.dw_common_mobile_device_user_mapping (\n" +
"  rowkey STRING,\n" +
"  f1 ROW< \n" +
"device_id STRING,\n" +
"pass_id STRING,\n" +
"first_date STRING,\n" +
"first_channel_id STRING,\n" +
"first_app_version STRING,\n" +
"first_server_time STRING,\n" +
"first_server_hour STRING,\n" +
"first_ip_location STRING,\n" +
"first_login_time STRING,\n" +
"sys_can_uninstall STRING,\n" +
"update_date STRING,\n" +
"server_time BIGINT,\n" +
"last_pass_id STRING,\n" +
"last_channel_id STRING,\n" +
"last_app_version STRING,\n" +
"last_date STRING,\n" +
"os STRING,\n" +
"attribution_channel_id STRING,\n" +
"attribution_first_date STRING,\n" +
"p_product STRING,\n" +
"p_project STRING,\n" +
"p_dt STRING\n" +
">\n" +
") WITH (\n" +
"  'connector.type' = 'hbase',\n" +
"  'connector.version' = '1.4.3',\n" + //
即使绕过语法编译,换其他版本的hbase,还是有问题,如线上的版本就不行
"  'connector.table-name' =
'dw_common_mobile_device_user_mapping_new',\n" +
"  'connector.zookeeper.quorum' = '"+ zookeeperServers
+"',\n" +
"  'connector.zookeeper.znode.parent' = '/hbase143',\n" +
"  'connector.write.buffer-flush.max-size' = '2mb',\n" +
"  'connector.write.buffer-flush.max-rows' = '1000',\n" +
"  'connector.write.buffer-flush.interval' = '2s'\n" +
")";

insert into sql:

String bodyAndLocalSql = "" +
//"insert into
test_hive_catalog.test_dim.dw_common_mobile_device_user_mapping " +
"SELECT CAST(rowkey AS STRING) AS rowkey, " +
" ROW(" +
" device_id, pass_id, first_date, first_channel_id,
first_app_version, first_server_time, first_server_hour, first_ip_location,
first_login_time, sys_can_uninstall, update_date, server_time,
last_pass_id, last_channel_id, last_app_version, last_date, os,
attribution_channel_id, attribution_first_date, p_product, p_project, p_dt
" +
") AS f1" +
" FROM " +
"(" +
" SELECT " +
" MD5(CONCAT_WS('|', kafka.uid, kafka.p_product,
kafka.p_project)) AS rowkey, " +
" kafka.uid AS device_id " +
",kafka.pass_id " +

// first_date
",CASE WHEN COALESCE(hbase.server_time, 0) <=
kafka.server_time " +
// 新用户
" THEN FROM_UNIXTIME(kafka.server_time, '-MM-dd') " +
// 老用户
" ELSE hbase.first_date END AS first_date " +

// first_channel_id
",CASE WHEN COALESCE(hbase.server_time, 0) <=
kafka.server_time " +
// 新用户
" THEN kafka.wlb_channel_id" +
// 老用户
" ELSE hbase.first_channel_id END AS first_channel_id " +

// first_app_version
",CASE WHEN COALESCE(hbase.server_time, 0) <=
kafka.server_time " +
// 新用户
" THEN kafka.app_version " +
// 老用户
" ELSE hbase.first_app_version END AS first_app_version " +

// first_server_time
",CASE WHEN COALESCE(hbase.server_time, 0) <=
kafka.server_time " +
// 新用户
" THEN FROM_UNIXTIME(kafka.server_time, '-MM-dd
HH:mm:ss') " +
// 老用户
" ELSE hbase.first_server_time END AS first_server_time " +

// first_server_hour
",CASE WHEN COALESCE(hbase.server_time, 0) <=
kafka.server_time " +
// 新用户
" THEN FROM_UNIXTIME(kafka.server_time, 'HH') " +
// 老用户
" ELSE hbase.first_server_hour END AS first_server_hour " +

// first_ip_location
",CASE WHEN COALESCE(hbase.server_time, 0) <=
kafka.server_time " +
// 新用户
 

HELP ! ! ! When using Flink1.10 to define table with the string type, the query result is null

2020-07-05 Thread Jim Chen
Hi, everyone!

When i use flink1.10 to define table, and i want to define the json array
as the string type. But the query resutl is null when i execute the program.
The detail code as follow:

package com.flink;

import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

/**
 * kafka topic: test_action
 *
 * kafka message:
 *   {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID":
"id002", "actionName": "bbb"} ] }
 */
public class Problem2 {

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment bsEnv = StreamTableEnvironment.create(env,
envSettings);
//bsEnv.registerFunction("explode3", new ExplodeFunction());

String ddlSource = "CREATE TABLE actionTable3 (\n" +
"action STRING\n" +
") WITH (\n" +
"'connector.type' = 'kafka',\n" +
"'connector.version' = '0.11',\n" +
"'connector.topic' = 'test_action',\n" +
"'connector.startup-mode' = 'earliest-offset',\n" +
"'connector.properties.zookeeper.connect' =
'localhost:2181',\n" +
"'connector.properties.bootstrap.servers' =
'localhost:9092',\n" +
"'update-mode' = 'append',\n" +
"'format.type' = 'json',\n" +
//"'format.derive-schema' = 'true',\n" +
"'format.json-schema' = '{\"type\": \"object\",
\"properties\": {\"action\": {\"type\": \"string\"} } }'" +
")";
System.out.println(ddlSource);
bsEnv.sqlUpdate(ddlSource);

Table table = bsEnv.sqlQuery("select * from actionTable3");
//Table table = bsEnv.sqlQuery("select * from actionTable2, LATERAL
TABLE(explode3(`action`)) as T(`word`)");
table.printSchema();
bsEnv.toAppendStream(table, Row.class)
.print();// the result is null

bsEnv.execute("ARRAY tableFunction Problem");
}
}


flink1.10 定义表时,把json数组声明成STRING类型的,查询出来是空

2020-07-05 Thread Jim Chen
Hi,
可以通过以下步骤还原车祸现场:
kafka topic: test_action
kafka message:
  {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID":
"id002", "actionName": "bbb"} ] }

代码Problem2.java:
package com.flink;

import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

/**
 *
 * 在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型,
 * 那么在eval方法接收到的就是Row[],
 * 问题出在,Row[]中的数据获取不到,里面的元素都是NULL
 *
 * 现在思路:就是在定义表的时候,把ARRYA看成STRING,
 * 现在的问题,就是查询出来,都是空
 *
 * kafka topic: test_action
 *
 * kafka message:
 *   {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID":
"id002", "actionName": "bbb"} ] }
 */
public class Problem2 {

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment bsEnv = StreamTableEnvironment.create(env,
envSettings);
bsEnv.registerFunction("explode3", new ExplodeFunction());

String ddlSource = "CREATE TABLE actionTable3 (\n" +
"action STRING\n" +
") WITH (\n" +
"'connector.type' = 'kafka',\n" +
"'connector.version' = '0.11',\n" +
"'connector.topic' = 'test_action',\n" +
"'connector.startup-mode' = 'earliest-offset',\n" +
"'connector.properties.zookeeper.connect' =
'localhost:2181',\n" +
"'connector.properties.bootstrap.servers' =
'localhost:9092',\n" +
"'update-mode' = 'append',\n" +
"'format.type' = 'json',\n" +
"'format.derive-schema' = 'false',\n" +
"'format.json-schema' = '{\"type\": \"object\",
\"properties\": {\"action\": {\"type\": \"string\"} } }'" +
")";
System.out.println(ddlSource);
bsEnv.sqlUpdate(ddlSource);

Table table = bsEnv.sqlQuery("select * from actionTable3");
//Table table = bsEnv.sqlQuery("select * from actionTable2, LATERAL
TABLE(explode3(`action`)) as T(`word`)");
table.printSchema();
bsEnv.toAppendStream(table, Row.class)
.print();// 输出都是空

bsEnv.execute("ARRAY tableFunction Problem");
}
}


Re: flink1.10在通过TableFunction实现行转列时,Row一直是空

2020-07-05 Thread Jim Chen
Hi,
我现在转换思路,就是在定义表的时候,把ARRYA看成STRING,
那么,现在的问题,就是查询出来,都是空。

基于上面的代码环境,新写了一个类
Problem2.java

package com.flink;

import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

/**
 *
 * 在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型,
 * 那么在eval方法接收到的就是Row[],
 * 问题出在,Row[]中的数据获取不到,里面的元素都是NULL
 *
 * 现在思路:就是在定义表的时候,把ARRYA看成STRING,
 * 现在的问题,就是查询出来,都是空
 *
 * kafka topic: test_action
 *
 * kafka message:
 *   {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID":
"id002", "actionName": "bbb"} ] }
 */
public class Problem2 {

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment bsEnv = StreamTableEnvironment.create(env,
envSettings);
bsEnv.registerFunction("explode3", new ExplodeFunction());

String ddlSource = "CREATE TABLE actionTable3 (\n" +
"action STRING\n" +
") WITH (\n" +
"'connector.type' = 'kafka',\n" +
"'connector.version' = '0.11',\n" +
"'connector.topic' = 'test_action',\n" +
"'connector.startup-mode' = 'earliest-offset',\n" +
"'connector.properties.zookeeper.connect' =
'localhost:2181',\n" +
"'connector.properties.bootstrap.servers' =
'localhost:9092',\n" +
"'update-mode' = 'append',\n" +
"'format.type' = 'json',\n" +
"'format.derive-schema' = 'false',\n" +
"'format.json-schema' = '{\"type\": \"object\",
\"properties\": {\"action\": {\"type\": \"string\"} } }'" +
")";
System.out.println(ddlSource);
bsEnv.sqlUpdate(ddlSource);

Table table = bsEnv.sqlQuery("select * from actionTable3");
//Table table = bsEnv.sqlQuery("select * from actionTable2, LATERAL
TABLE(explode3(`action`)) as T(`word`)");
table.printSchema();
bsEnv.toAppendStream(table, Row.class)
.print();// 输出都是空

bsEnv.execute("ARRAY tableFunction Problem");
}
}

Jark Wu  于2020年7月6日周一 上午10:36写道:

> Hi,
>
> 当前还不支持 Row[] 作为参数。目前有一个 issue 在解决这个问题,可以关注下。
> https://issues.apache.org/jira/browse/FLINK-17855
>
>
> Best,
> Jark
>
> On Mon, 6 Jul 2020 at 10:19, Jim Chen  wrote:
>
> > 大家好:
> > 我的业务场景,是想实现一个行转列的效果。然后通过自定义tableFunction来实现。
> > 在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型,
> > 那么在eval方法接收到的就是Row[],
> > 问题出在,Row[]中的数据获取不到,里面的元素都是NULL
> >
> > 通过下面的步骤和代码可还原车祸场景:
> > kafka topic: test_action
> > kafka message:
> > {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID":
> > "id002", "actionName": "bbb"} ] }
> >
> > 代码1:Problem.java
> > package com.flink;
> >
> > import
> > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> > import org.apache.flink.table.api.EnvironmentSettings;
> > import org.apache.flink.table.api.Table;
> > import org.apache.flink.table.api.java.StreamTableEnvironment;
> > import org.apache.flink.types.Row;
> >
> > /**
> >  *
> >  * 在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型,
> >  * 那么在eval方法接收到的就是Row[],
> >  * 问题出在,Row[]中的数据获取不到,里面的元素都是NULL
> >  *
> >  * kafka topic: test_action
> >  *
> >  * kafka message:
> >  *   {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID":
> > "id002", "actionName": "bbb"} ] }
> >  */
> > public class Problem {
> >
> > public static void main(String[] args) throws Exception {
> > StreamExecutionEnvironment env =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> > EnvironmentSettings envSettings =
> EnvironmentSettings.newInstance()
> > .useBlinkPlanner()
> > .inStreamingMode()
> > .build();
> > StreamTableEnvironment

flink1.10在通过TableFunction实现行转列时,Row一直是空

2020-07-05 Thread Jim Chen
大家好:
我的业务场景,是想实现一个行转列的效果。然后通过自定义tableFunction来实现。
在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型,
那么在eval方法接收到的就是Row[],
问题出在,Row[]中的数据获取不到,里面的元素都是NULL

通过下面的步骤和代码可还原车祸场景:
kafka topic: test_action
kafka message:
{"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID":
"id002", "actionName": "bbb"} ] }

代码1:Problem.java
package com.flink;

import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

/**
 *
 * 在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型,
 * 那么在eval方法接收到的就是Row[],
 * 问题出在,Row[]中的数据获取不到,里面的元素都是NULL
 *
 * kafka topic: test_action
 *
 * kafka message:
 *   {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID":
"id002", "actionName": "bbb"} ] }
 */
public class Problem {

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment bsEnv = StreamTableEnvironment.create(env,
envSettings);
bsEnv.registerFunction("explode2", new ExplodeFunction());

String ddlSource = "CREATE TABLE actionTable (\n" +
"action ARRAY<\n" +
"   ROW<" +
"   actionID STRING,\n" +
"   actionName STRING\n" +
"   >\n" +
"   >\n" +
") WITH (\n" +
"'connector.type' = 'kafka',\n" +
"'connector.version' = '0.11',\n" +
"'connector.topic' = 'test_action',\n" +
"'connector.startup-mode' = 'earliest-offset',\n" +
"'connector.properties.zookeeper.connect' =
'localhost:2181',\n" +
"'connector.properties.bootstrap.servers' =
'localhost:9092',\n" +
"'update-mode' = 'append',\n" +
"'format.type' = 'json'\n" +
")";
bsEnv.sqlUpdate(ddlSource);

//Table table = bsEnv.sqlQuery("select `action` from actionTable");
Table table = bsEnv.sqlQuery("select * from actionTable, LATERAL
TABLE(explode2(`action`)) as T(`word`)");
table.printSchema();
bsEnv.toAppendStream(table, Row.class)
.print("==tb==");


bsEnv.execute("ARRAY tableFunction Problem");
}
}

代码2:ExplodeFunction.java
package com.flink;

import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;

import java.util.ArrayList;
import java.util.Arrays;

public class ExplodeFunction extends TableFunction {

public void eval(Row[] values) {
System.out.println(values.length);
if (values.length > 0) {
for (Row row : values) {
if (row != null) {// 这里debug出来的row总是空
ArrayList list = new ArrayList<>();
for (int i = 0; i < row.getArity(); i++) {
Object field = row.getField(i);
list.add(field);
}

collector.collect(Row.of(Arrays.toString(list.toArray(;
}
}
}
}
}

最后贴个debug的图
[image: image.png]


Re: 对于维表频繁更新,状态越来越大的场景如何保证数据的准确性

2020-06-18 Thread Jim Chen
请问下,在flink sql1.10中, localcache+异步IO,这个方案,是直接写sql关联维表就行了吗?flink
sql会自动在底层做优化工作吗?如果要自己手动实现的话,有没有什么demo呢?谢谢

Jark Wu  于2020年6月17日周三 上午12:11写道:

> 如果更新非常频繁,又要保证关联的准确性,又要保证吞吐,那么最佳的解决方案我觉得只能是关联 changelog 了,
> 只是 Flink 目前还没有原生支持维表关联一个 changelog,会在Flink SQL 1.12中去支持。
>
> 当前版本下的话,可以尝试 keyby+localcache+异步IO。
>
> Best,
> Jark
>
> On Tue, 16 Jun 2020 at 22:35, 李奇 <359502...@qq.com> wrote:
>
> > 或者采用redis做维表存储介质。
> >
> > > 在 2020年6月16日,下午10:10,wangxiangyan  写道:
> > >
> > > hi,大家
> > > 维表被频繁更新,数据量1g左右,需要频繁同步,使用什么方案去关联比较好呢?
> >
>


在使用flink1.7.2写入ES6的时候,有时会报错:request retries exceeded max retry timeout [30000]

2020-05-13 Thread Jim Chen
大家好,

我在使用flink1.7.2写入ES6的时候,有时会报错:request retries exceeded max retry timeout
[3],报错信息如下:
java.lang.RuntimeException: An error occurred in ElasticsearchSink.
at
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkErrorAndRethrow(ElasticsearchSinkBase.java:381)
at
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkAsyncErrorsAndRequests(ElasticsearchSinkBase.java:386)
at
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:307)
at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:649)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:602)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
at
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
at
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
at
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
at
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
at
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
at
org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:89)
at
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:154)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:665)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
at

Re: How to consume kafka from the last offset?

2020-03-26 Thread Jim Chen
Hi,
I am so sorry. It's not auto.offset.reset. Correctly, it is
*enable.auto.commit=false*

Best Wishs!

Dominik Wosiński  于2020年3月26日周四 下午4:20写道:

> Hey,
> Are You completely sure you mean *auto.offset.reset ??  *False is not
> valid setting for that AFAIK.
>
> Best,
> Dom.
>
> czw., 26 mar 2020 o 08:38 Jim Chen 
> napisał(a):
>
>> Thanks!
>>
>> I made a mistake. I forget to set the auto.offset.reset=false. It's my
>> fault.
>>
>> Dominik Wosiński  于2020年3月25日周三 下午6:49写道:
>>
>>> Hi Jim,
>>> Well, *auto.offset.reset *is only used when there is no offset saved
>>> for this *group.id <http://group.id>* in Kafka. So, if You want to read
>>> the data from the latest record (and by latest I mean the newest here) You
>>> should assign the *group.id <http://group.id>* that was not previously
>>> used and then FlinkKafkaConsumer should automatically fetch the last offset
>>> and start reading from that place.
>>>
>>>
>>> Best Regards,
>>> Dom.
>>>
>>> śr., 25 mar 2020 o 11:19 Jim Chen 
>>> napisał(a):
>>>
>>>> Hi, All
>>>>   I use flink-connector-kafka-0.11 consume the Kafka0.11. In
>>>> KafkaConsumer params, i set the group.id and auto.offset.reset. In the
>>>> Flink1.10, set the kafkaConsumer.setStartFromGroupOffsets();
>>>>   Then, i restart the application, found the offset is not from the
>>>> last position. Any one know where is wrong? HELP!
>>>>
>>>


When i use the Tumbling Windows, find lost some record

2020-03-26 Thread Jim Chen
Hi, All

  When i use the Tumbling Windows, find lost some record. My code as follow

*env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);*

*env.addSource(FlinkKafkaConsumer011..)*







*.assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor(Time.minutes(3)) {
  @Overridepublic long
extractTimestamp(JSONObject jsonObject) {long
logTime = jsonObject.getLongValue("logTime");return
logTime;}})*


*.keyBy(jsonObject -> {return
jsonObject.getString("userId");})*

*.timeWindow(Time.seconds(30))*

*.process(new ProcessWindowFunction() {*
* public void process(String key, Context context,
Iterable iterable, Collector collector) throws
Exception {*




*SimpleDateFormat sdf = new
SimpleDateFormat("-MM-dd HH:mm:ss");String
start = sdf.format(new Date(context.window().getStart()));
  String end = sdf.format(new Date(context.window().getEnd()));
System.out.println(start + "" + end);*
*for (JSONObject jsonObject : iterable) {*
*   collector.collect(jsonObject);*
*}}}*
*.print("");*

>From the print result, i found lost some record in the tumbling window. I
can't figure out, any one can help me ?


Re: How to consume kafka from the last offset?

2020-03-26 Thread Jim Chen
Thanks!

I made a mistake. I forget to set the auto.offset.reset=false. It's my
fault.

Dominik Wosiński  于2020年3月25日周三 下午6:49写道:

> Hi Jim,
> Well, *auto.offset.reset *is only used when there is no offset saved for
> this *group.id <http://group.id>* in Kafka. So, if You want to read the
> data from the latest record (and by latest I mean the newest here) You
> should assign the *group.id <http://group.id>* that was not previously
> used and then FlinkKafkaConsumer should automatically fetch the last offset
> and start reading from that place.
>
>
> Best Regards,
> Dom.
>
> śr., 25 mar 2020 o 11:19 Jim Chen 
> napisał(a):
>
>> Hi, All
>>   I use flink-connector-kafka-0.11 consume the Kafka0.11. In
>> KafkaConsumer params, i set the group.id and auto.offset.reset. In the
>> Flink1.10, set the kafkaConsumer.setStartFromGroupOffsets();
>>   Then, i restart the application, found the offset is not from the last
>> position. Any one know where is wrong? HELP!
>>
>


How to consume kafka from the last offset?

2020-03-25 Thread Jim Chen
Hi, All
  I use flink-connector-kafka-0.11 consume the Kafka0.11. In KafkaConsumer
params, i set the group.id and auto.offset.reset. In the Flink1.10, set
the kafkaConsumer.setStartFromGroupOffsets();
  Then, i restart the application, found the offset is not from the last
position. Any one know where is wrong? HELP!


Re: Flink1.10 Cluster's Received is zero in the web when consume from Kafka0.11

2020-03-25 Thread Jim Chen
Thanks for the tip!

May be call env.disableOperatorChaining() can show the received on the
dashborad

Chesnay Schepler  于2020年3月25日周三 下午5:56写道:

> This is a known limitation, see
> https://issues.apache.org/jira/browse/FLINK-7286 .
>
> As a crude workaround you may either break the chain after the source /
> before the sink, or query the numRecordsOut metric for the source /
> numRecordsIn metric for the sink via the WebUI metrics tab or REST API.
>
> On 25/03/2020 10:49, Jim Chen wrote:
>
> Hi, all
>   When I use flink-connector-kafka-0.11 consume Kafka0.11, the Cluster
> web's Received Record is always 0. However, the log is not empty. Any one
> can help me?
>
> [image: image.png]
>
>
>


Flink1.10 Cluster's Received is zero in the web when consume from Kafka0.11

2020-03-25 Thread Jim Chen
Hi, all
  When I use flink-connector-kafka-0.11 consume Kafka0.11, the Cluster
web's Received Record is always 0. However, the log is not empty. Any one
can help me?

[image: image.png]


Flink1.10版本消费Kafka0.11版本,页面监控received都是0

2020-03-25 Thread Jim Chen
请教一个问题:我使用的是Flink是1.10版本消费Kafka0.11版本,直接打印出来。Flink集群是standalong模式,页面监控上的received都是0,不知道怎么回事?