Re:Re:flink sql bigint cannot be cast to mysql Long

2020-06-10 文章 Zhou Zach









项目里引用的mysql:

   mysql
   mysql-connector-java
   5.1.46

使用的Mysql版本是5.7.18-log
如果mysql里面的字段是bigint,建表转换成int吗,会有截断风险吧








At 2020-06-11 13:39:18, "chaojianok"  wrote:
>检查一下你项目里引入的 MySQL 包的版本是否和你使用的 MySQL 版本一致,或者也可以直接转换一下数据类型。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>At 2020-06-11 13:22:07, "Zhou Zach"  wrote:
>>SLF4J: Class path contains multiple SLF4J bindings.
>>
>>SLF4J: Found binding in 
>>[jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>
>>SLF4J: Found binding in 
>>[jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>
>>SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
>>explanation.
>>
>>SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
>>
>>ERROR StatusLogger No log4j2 configuration file found. Using default 
>>configuration: logging only errors to the console.
>>
>>Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without 
>>server's identity verification is not recommended. According to MySQL 
>>5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established 
>>by default if explicit option isn't set. For compliance with existing 
>>applications not using SSL the verifyServerCertificate property is set to 
>>'false'. You need either to explicitly disable SSL by setting useSSL=false, 
>>or set useSSL=true and provide truststore for server certificate verification.
>>
>>Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without 
>>server's identity verification is not recommended. According to MySQL 
>>5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established 
>>by default if explicit option isn't set. For compliance with existing 
>>applications not using SSL the verifyServerCertificate property is set to 
>>'false'. You need either to explicitly disable SSL by setting useSSL=false, 
>>or set useSSL=true and provide truststore for server certificate verification.
>>
>>Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without 
>>server's identity verification is not recommended. According to MySQL 
>>5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established 
>>by default if explicit option isn't set. For compliance with existing 
>>applications not using SSL the verifyServerCertificate property is set to 
>>'false'. You need either to explicitly disable SSL by setting useSSL=false, 
>>or set useSSL=true and provide truststore for server certificate verification.
>>
>>Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without 
>>server's identity verification is not recommended. According to MySQL 
>>5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established 
>>by default if explicit option isn't set. For compliance with existing 
>>applications not using SSL the verifyServerCertificate property is set to 
>>'false'. You need either to explicitly disable SSL by setting useSSL=false, 
>>or set useSSL=true and provide truststore for server certificate verification.
>>
>>Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without 
>>server's identity verification is not recommended. According to MySQL 
>>5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established 
>>by default if explicit option isn't set. For compliance with existing 
>>applications not using SSL the verifyServerCertificate property is set to 
>>'false'. You need either to explicitly disable SSL by setting useSSL=false, 
>>or set useSSL=true and provide truststore for server certificate verification.
>>
>>Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without 
>>server's identity verification is not recommended. According to MySQL 
>>5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established 
>>by default if explicit option isn't set. For compliance with existing 
>>applications not using SSL the verifyServerCertificate property is set to 
>>'false'. You need either to explicitly disable SSL by setting useSSL=false, 
>>or set useSSL=true and provide truststore for server certificate verification.
>>
>>Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without 
>>server's identity verification is not recommended. According to MySQL 
>>5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established 
>>by default if explicit option isn't set. For compliance with existing 
>>applications not using SSL the verifyServerCertificate property is set to 
>>'false'. You need either to explicitly disable SSL by setting useSSL=false, 
>>or set useSSL=true and provide truststore for server certificate verification.
>>
>>Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without 
>>server's identity verification is not recommended. According to MySQL 
>>5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL 

Re:Re: flink sql bigint cannot be cast to mysql Long

2020-06-10 文章 Zhou Zach
flink版本是1.10.0,
mysql表:
CREATE TABLE `analysis_gift_consume` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
  `times` int(8) NOT NULL COMMENT '时间[MMdd]',
  `gid` int(4) NOT NULL DEFAULT '0' COMMENT '礼物ID',
  `gname` varchar(100) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT 
'礼物名称',
  `counts` bigint(20) NOT NULL DEFAULT '0' COMMENT '',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 
COLLATE=utf8mb4_unicode_ci COMMENT='';




CREATE TABLE `analysis_gift_consume1` (

  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,

  `times` int(8) NOT NULL COMMENT '时间[MMdd]',

  `gid` int(4) NOT NULL DEFAULT '0' COMMENT '礼物ID',

  `gname` varchar(100) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT 
'礼物名称',

  `counts` bigint(20) NOT NULL DEFAULT '0' COMMENT '',

  PRIMARY KEY (`id`)

) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 
COLLATE=utf8mb4_unicode_ci COMMENT='';




数据库的字段是 bigint 类型,总有场景在mysql的字段设置为bigint吧,如果mysql的字段为bigint,那在创建flink 
sql时,用什么类型合适呢





At 2020-06-11 13:42:11, "Leonard Xu"  wrote:
>Hi,
>用的 flink 版本是多少? 数据库的字段确定是 bigint 类型吗?
>> Caused by: java.lang.ClassCastException: java.math.BigInteger cannot be cast 
>> to java.lang.Long
>
>java.math.BigInteger 的范围比 java.lang.Long的范围大很多,是不能cast的,应该是你数据类型对应错误了,可以把mysql 
>表的schema贴下吗?
>
>
>祝好,
>Leonard Xu
>
>> 在 2020年6月11日,13:22,Zhou Zach  写道:
>> 
>> SLF4J: Class path contains multiple SLF4J bindings.
>> 
>> SLF4J: Found binding in 
>> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> 
>> SLF4J: Found binding in 
>> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> 
>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
>> explanation.
>> 
>> SLF4J: Actual binding is of type 
>> [org.apache.logging.slf4j.Log4jLoggerFactory]
>> 
>> ERROR StatusLogger No log4j2 configuration file found. Using default 
>> configuration: logging only errors to the console.
>> 
>> Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without 
>> server's identity verification is not recommended. According to MySQL 
>> 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established 
>> by default if explicit option isn't set. For compliance with existing 
>> applications not using SSL the verifyServerCertificate property is set to 
>> 'false'. You need either to explicitly disable SSL by setting useSSL=false, 
>> or set useSSL=true and provide truststore for server certificate 
>> verification.
>> 
>> Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without 
>> server's identity verification is not recommended. According to MySQL 
>> 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established 
>> by default if explicit option isn't set. For compliance with existing 
>> applications not using SSL the verifyServerCertificate property is set to 
>> 'false'. You need either to explicitly disable SSL by setting useSSL=false, 
>> or set useSSL=true and provide truststore for server certificate 
>> verification.
>> 
>> Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without 
>> server's identity verification is not recommended. According to MySQL 
>> 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established 
>> by default if explicit option isn't set. For compliance with existing 
>> applications not using SSL the verifyServerCertificate property is set to 
>> 'false'. You need either to explicitly disable SSL by setting useSSL=false, 
>> or set useSSL=true and provide truststore for server certificate 
>> verification.
>> 
>> Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without 
>> server's identity verification is not recommended. According to MySQL 
>> 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established 
>> by default if explicit option isn't set. For compliance with existing 
>> applications not using SSL the verifyServerCertificate property is set to 
>> 'false'. You need either to explicitly disable SSL by setting useSSL=false, 
>> or set useSSL=true and provide truststore for server certificate 
>> verification.
>> 
>> Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without 
>> server's identity verification is not recommended. According to MySQL 
>> 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established 
>> by default if explicit option isn't set. For compliance with existing 
>> applications not using SSL the verifyServerCertificate property is set to 
>> 'false'. You need either to explicitly disable SSL by setting useSSL=false, 
>> or set useSSL=true and provide truststore for server certificate 
>> verification.
>> 
>> Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without 
>> server's identity verification is not 

Re: flink sql bigint cannot be cast to mysql Long

2020-06-10 文章 Leonard Xu
Hi,
用的 flink 版本是多少? 数据库的字段确定是 bigint 类型吗?
> Caused by: java.lang.ClassCastException: java.math.BigInteger cannot be cast 
> to java.lang.Long

java.math.BigInteger 的范围比 java.lang.Long的范围大很多,是不能cast的,应该是你数据类型对应错误了,可以把mysql 
表的schema贴下吗?


祝好,
Leonard Xu

> 在 2020年6月11日,13:22,Zhou Zach  写道:
> 
> SLF4J: Class path contains multiple SLF4J bindings.
> 
> SLF4J: Found binding in 
> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> 
> SLF4J: Found binding in 
> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> 
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.
> 
> SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
> 
> ERROR StatusLogger No log4j2 configuration file found. Using default 
> configuration: logging only errors to the console.
> 
> Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without 
> server's identity verification is not recommended. According to MySQL 
> 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established 
> by default if explicit option isn't set. For compliance with existing 
> applications not using SSL the verifyServerCertificate property is set to 
> 'false'. You need either to explicitly disable SSL by setting useSSL=false, 
> or set useSSL=true and provide truststore for server certificate verification.
> 
> Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without 
> server's identity verification is not recommended. According to MySQL 
> 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established 
> by default if explicit option isn't set. For compliance with existing 
> applications not using SSL the verifyServerCertificate property is set to 
> 'false'. You need either to explicitly disable SSL by setting useSSL=false, 
> or set useSSL=true and provide truststore for server certificate verification.
> 
> Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without 
> server's identity verification is not recommended. According to MySQL 
> 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established 
> by default if explicit option isn't set. For compliance with existing 
> applications not using SSL the verifyServerCertificate property is set to 
> 'false'. You need either to explicitly disable SSL by setting useSSL=false, 
> or set useSSL=true and provide truststore for server certificate verification.
> 
> Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without 
> server's identity verification is not recommended. According to MySQL 
> 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established 
> by default if explicit option isn't set. For compliance with existing 
> applications not using SSL the verifyServerCertificate property is set to 
> 'false'. You need either to explicitly disable SSL by setting useSSL=false, 
> or set useSSL=true and provide truststore for server certificate verification.
> 
> Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without 
> server's identity verification is not recommended. According to MySQL 
> 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established 
> by default if explicit option isn't set. For compliance with existing 
> applications not using SSL the verifyServerCertificate property is set to 
> 'false'. You need either to explicitly disable SSL by setting useSSL=false, 
> or set useSSL=true and provide truststore for server certificate verification.
> 
> Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without 
> server's identity verification is not recommended. According to MySQL 
> 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established 
> by default if explicit option isn't set. For compliance with existing 
> applications not using SSL the verifyServerCertificate property is set to 
> 'false'. You need either to explicitly disable SSL by setting useSSL=false, 
> or set useSSL=true and provide truststore for server certificate verification.
> 
> Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without 
> server's identity verification is not recommended. According to MySQL 
> 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established 
> by default if explicit option isn't set. For compliance with existing 
> applications not using SSL the verifyServerCertificate property is set to 
> 'false'. You need either to explicitly disable SSL by setting useSSL=false, 
> or set useSSL=true and provide truststore for server certificate verification.
> 
> Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without 
> server's identity verification is not recommended. According to MySQL 
> 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be 

Re:flink sql bigint cannot be cast to mysql Long

2020-06-10 文章 chaojianok
检查一下你项目里引入的 MySQL 包的版本是否和你使用的 MySQL 版本一致,或者也可以直接转换一下数据类型。

















At 2020-06-11 13:22:07, "Zhou Zach"  wrote:
>SLF4J: Class path contains multiple SLF4J bindings.
>
>SLF4J: Found binding in 
>[jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>
>SLF4J: Found binding in 
>[jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>
>SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
>explanation.
>
>SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
>
>ERROR StatusLogger No log4j2 configuration file found. Using default 
>configuration: logging only errors to the console.
>
>Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without 
>server's identity verification is not recommended. According to MySQL 5.5.45+, 
>5.6.26+ and 5.7.6+ requirements SSL connection must be established by default 
>if explicit option isn't set. For compliance with existing applications not 
>using SSL the verifyServerCertificate property is set to 'false'. You need 
>either to explicitly disable SSL by setting useSSL=false, or set useSSL=true 
>and provide truststore for server certificate verification.
>
>Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without 
>server's identity verification is not recommended. According to MySQL 5.5.45+, 
>5.6.26+ and 5.7.6+ requirements SSL connection must be established by default 
>if explicit option isn't set. For compliance with existing applications not 
>using SSL the verifyServerCertificate property is set to 'false'. You need 
>either to explicitly disable SSL by setting useSSL=false, or set useSSL=true 
>and provide truststore for server certificate verification.
>
>Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without 
>server's identity verification is not recommended. According to MySQL 5.5.45+, 
>5.6.26+ and 5.7.6+ requirements SSL connection must be established by default 
>if explicit option isn't set. For compliance with existing applications not 
>using SSL the verifyServerCertificate property is set to 'false'. You need 
>either to explicitly disable SSL by setting useSSL=false, or set useSSL=true 
>and provide truststore for server certificate verification.
>
>Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without 
>server's identity verification is not recommended. According to MySQL 5.5.45+, 
>5.6.26+ and 5.7.6+ requirements SSL connection must be established by default 
>if explicit option isn't set. For compliance with existing applications not 
>using SSL the verifyServerCertificate property is set to 'false'. You need 
>either to explicitly disable SSL by setting useSSL=false, or set useSSL=true 
>and provide truststore for server certificate verification.
>
>Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without 
>server's identity verification is not recommended. According to MySQL 5.5.45+, 
>5.6.26+ and 5.7.6+ requirements SSL connection must be established by default 
>if explicit option isn't set. For compliance with existing applications not 
>using SSL the verifyServerCertificate property is set to 'false'. You need 
>either to explicitly disable SSL by setting useSSL=false, or set useSSL=true 
>and provide truststore for server certificate verification.
>
>Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without 
>server's identity verification is not recommended. According to MySQL 5.5.45+, 
>5.6.26+ and 5.7.6+ requirements SSL connection must be established by default 
>if explicit option isn't set. For compliance with existing applications not 
>using SSL the verifyServerCertificate property is set to 'false'. You need 
>either to explicitly disable SSL by setting useSSL=false, or set useSSL=true 
>and provide truststore for server certificate verification.
>
>Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without 
>server's identity verification is not recommended. According to MySQL 5.5.45+, 
>5.6.26+ and 5.7.6+ requirements SSL connection must be established by default 
>if explicit option isn't set. For compliance with existing applications not 
>using SSL the verifyServerCertificate property is set to 'false'. You need 
>either to explicitly disable SSL by setting useSSL=false, or set useSSL=true 
>and provide truststore for server certificate verification.
>
>Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without 
>server's identity verification is not recommended. According to MySQL 5.5.45+, 
>5.6.26+ and 5.7.6+ requirements SSL connection must be established by default 
>if explicit option isn't set. For compliance with existing applications not 
>using SSL the verifyServerCertificate property is set to 'false'. You need 
>either to explicitly disable SSL by setting useSSL=false, or set useSSL=true 

flink sql bigint cannot be cast to mysql Long

2020-06-10 文章 Zhou Zach
SLF4J: Class path contains multiple SLF4J bindings.

SLF4J: Found binding in 
[jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: Found binding in 
[jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.

SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]

ERROR StatusLogger No log4j2 configuration file found. Using default 
configuration: logging only errors to the console.

Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without server's 
identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ 
and 5.7.6+ requirements SSL connection must be established by default if 
explicit option isn't set. For compliance with existing applications not using 
SSL the verifyServerCertificate property is set to 'false'. You need either to 
explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide 
truststore for server certificate verification.

Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without server's 
identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ 
and 5.7.6+ requirements SSL connection must be established by default if 
explicit option isn't set. For compliance with existing applications not using 
SSL the verifyServerCertificate property is set to 'false'. You need either to 
explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide 
truststore for server certificate verification.

Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without server's 
identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ 
and 5.7.6+ requirements SSL connection must be established by default if 
explicit option isn't set. For compliance with existing applications not using 
SSL the verifyServerCertificate property is set to 'false'. You need either to 
explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide 
truststore for server certificate verification.

Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without server's 
identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ 
and 5.7.6+ requirements SSL connection must be established by default if 
explicit option isn't set. For compliance with existing applications not using 
SSL the verifyServerCertificate property is set to 'false'. You need either to 
explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide 
truststore for server certificate verification.

Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without server's 
identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ 
and 5.7.6+ requirements SSL connection must be established by default if 
explicit option isn't set. For compliance with existing applications not using 
SSL the verifyServerCertificate property is set to 'false'. You need either to 
explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide 
truststore for server certificate verification.

Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without server's 
identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ 
and 5.7.6+ requirements SSL connection must be established by default if 
explicit option isn't set. For compliance with existing applications not using 
SSL the verifyServerCertificate property is set to 'false'. You need either to 
explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide 
truststore for server certificate verification.

Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without server's 
identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ 
and 5.7.6+ requirements SSL connection must be established by default if 
explicit option isn't set. For compliance with existing applications not using 
SSL the verifyServerCertificate property is set to 'false'. You need either to 
explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide 
truststore for server certificate verification.

Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without server's 
identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ 
and 5.7.6+ requirements SSL connection must be established by default if 
explicit option isn't set. For compliance with existing applications not using 
SSL the verifyServerCertificate property is set to 'false'. You need either to 
explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide 
truststore for server certificate verification.

Thu Jun 11 13:18:19 CST 2020 WARN: Establishing SSL connection without server's 
identity verification is not recommended. According to 

Re: keyed state在不同算子中使用

2020-06-10 文章 Congxian Qiu
你好
  现在 KeyedState 是不能跨算子使用的,也就是不同的算子使用的是不同的 state。
Best,
Congxian


Z-Z  于2020年6月11日周四 上午10:11写道:

> 请问,假设两个算子以相同的字段keyby,它们可以使用相同的StateDescriptor从而使用相同的的keyed state吗


keyed state????????????????

2020-06-10 文章 Z-Z
??keybyStateDescriptorkeyed
 state??

Re: kafka????????

2020-06-10 文章 ??????
Mikey

Re: flink 1.10SQL 报错问题求教

2020-06-10 文章 godfrey he
hi chenkaibit

欢迎将fix贡献回社区


chenkaibit  于2020年6月9日周二 上午10:34写道:

> 我也遇到过这个问题,这个可能是 checkpoint 的 bug,我修改了下 flink 源码后不报错了,可以参考下这个patch
> https://github.com/yuchuanchen/flink/commit/e5122d9787be1fee9bce141887e0d70c9b0a4f19
>
>
> 在 2020-06-05 15:06:48,"hb" <343122...@163.com> 写道:
> >Flink SQL 作业, 开始运行正常, checkpoint也正常, 没有大状态(状态19.7KB),
> >但是执行一段时间后,开始报错, 然后恢复执行个几秒,也马上报这个错,然后报错恢复不停循环.
> >哪位帮忙看看,不胜感激.
> >
> >
> >2020-06-05 14:13:19,791 INFO org.apache.flink.runtime.taskmanager.Task -
> Source: KafkaTableSource(userId, utp, utrp, extendFields, requestTime) ->
> SourceConversion(table=[default_catalog.default_database.user_visit_trace,
> source: [KafkaTableSource(userId, utp, utrp, extendFields, requestTime)]],
> fields=[userId, utp, utrp, extendFields, requestTime]) ->
> Calc(select=[getItemId(extendFields) AS item_id, userId, requestTime],
> where=[((utp = _UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")
> AND (((CAST(requestTime) / 1000) FROM_UNIXTIME _UTF-16LE'MMdd') >
> _UTF-16LE'20200602') AND getItemId(extendFields) IS NOT NULL AND userId IS
> NOT NULL)]) -> LookupJoin(table=[JDBCTableSource(shop_id, user_id)],
> joinType=[InnerJoin], async=[false], lookup=[user_id=userId],
> where=[shop_id IS NOT NULL], select=[item_id, userId, requestTime, shop_id,
> user_id]) -> Calc(select=[(_UTF-16LE'reco_rt:' CONCAT ((CAST(requestTime) /
> 1000) FROM_UNIXTIME _UTF-16LE'MMdd') CONCAT
> _UTF-16LE':rt:feature:shop5item:cnt_show:' CONCAT shop_id CONCAT
> _UTF-16LE':' CONCAT item_id) AS redisKey, requestTime AS fieldName]) ->
> SinkConversionToTuple2 -> Sink: Unnamed (1/8)
> (ad7a2d51beccbb39f83ac2c16b923bd0) switched from RUNNING to FAILED.
> >java.lang.Exception: Could not perform checkpoint 401 for operator
> Source: KafkaTableSource(userId, utp, utrp, extendFields, requestTime) ->
> SourceConversion(table=[default_catalog.default_database.user_visit_trace,
> source: [KafkaTableSource(userId, utp, utrp, extendFields, requestTime)]],
> fields=[userId, utp, utrp, extendFields, requestTime]) ->
> Calc(select=[getItemId(extendFields) AS item_id, userId, requestTime],
> where=[((utp = _UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")
> AND (((CAST(requestTime) / 1000) FROM_UNIXTIME _UTF-16LE'MMdd') >
> _UTF-16LE'20200602') AND getItemId(extendFields) IS NOT NULL AND userId IS
> NOT NULL)]) -> LookupJoin(table=[JDBCTableSource(shop_id, user_id)],
> joinType=[InnerJoin], async=[false], lookup=[user_id=userId],
> where=[shop_id IS NOT NULL], select=[item_id, userId, requestTime, shop_id,
> user_id]) -> Calc(select=[(_UTF-16LE'reco_rt:' CONCAT ((CAST(requestTime) /
> 1000) FROM_UNIXTIME _UTF-16LE'MMdd') CONCAT
> _UTF-16LE':rt:feature:shop5item:cnt_show:' CONCAT shop_id CONCAT
> _UTF-16LE':' CONCAT item_id) AS redisKey, requestTime AS fieldName]) ->
> SinkConversionToTuple2 -> Sink: Unnamed (1/8).
> >at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:802)
> >at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$3(StreamTask.java:777)
> >at
> org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$595/1311782208.call(Unknown
> Source)
> >at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> >at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)
> >at
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
> >at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
> >at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
> >at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
> >at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
> >at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> >at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> >at java.lang.Thread.run(Thread.java:745)
> >Caused by: java.lang.NullPointerException
> >at
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1411)
> >at
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:991)
> >at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:887)
> >at
> org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$597/1708409807.run(Unknown
> Source)
> >at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> >at
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:860)
> >at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:793)
> >... 12 more

Re: flink任务checkpoint无法完成snapshot,且报kafka异常

2020-06-10 文章 Congxian Qiu
Hi

从错误栈看是因为 task 端 snapshot 出问题了,原因是 “Caused by:
java.lang.IllegalStateException: Pending record count must be zero at this
point: 5”,需要看一下为什么会走到这里

Best,
Congxian


李奇 <359502...@qq.com> 于2020年6月10日周三 下午5:57写道:

>
> 哈喽,根据我自己遇到checkpoint失败,一般是因为你数据有问题,导致算子失败,有可能是数据格式,或者字段类型不匹配,字段数量等相关的原因造成,我看你补充的内容,好像是你kafka数据有问题样,你可以往这个方向看看数据是否正常。解析是否正确。
>
> > 在 2020年6月10日,下午1:24,Zhefu PENG  写道:
> >
> > 补充一下,在TaskManager发现了如下错误日志:
> >
> > 2020-06-10 12:44:40,688 ERROR
> > org.apache.flink.streaming.runtime.tasks.StreamTask   - Error
> > during disposal of stream operator.
> > org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed
> to
> > send data to Kafka: Pending record count must be zero at this point: 5
> > at
> >
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1218)
> > at
> >
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:861)
> > at
> >
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
> > at
> >
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:668)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:579)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:481)
> > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> > at java.lang.Thread.run(Thread.java:748)
> > Caused by: java.lang.IllegalStateException: Pending record count must be
> > zero at this point: 5
> > at
> >
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:969)
> > at
> >
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:834)
> > ... 8 more
> >
> > 希望得到帮助,感谢!
> >
> >
> > Zhefu PENG  于2020年6月10日周三 下午1:03写道:
> >
> >> Hi all,
> >>
> >> 现在有一个简单的flink任务,大概chain在一起后的执行图为:
> >> Source: Custom Source -> Map -> Source_Map -> Empty_Filer ->
> Field_Filter
> >> -> Type_Filter -> Value_Filter -> Map -> Map -> Map -> Sink: Unnamed
> >>
> >>
> >>
> 但是在上线一段时间后,开始报错,日志中有说到无法完成checkpoint,还提到有kafka的网络和连接异常。但还有别的flink任务在相同的broker上进行数据的读写,并且没有报错。我们暂时定位在,有可能每个checkpoint的完成时间比较长,需要几百毫秒,我们设的时间间隔又比较短,只有一秒,可能是这部分影响到了任务的性能。但是这只是一个不太靠谱的猜想,现在也没有什么排查的切入点,想看看大家有没有一些看法或者建议意见,非常感谢。
> >>
> >> 部分报错信息如下:
> >> 2020-06-10 12:02:49,083 INFO
> >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Triggering
> >> checkpoint 1 @ 1591761769060 for job c41f4811262db1c4c270b136571c8201.
> >> 2020-06-10 12:04:47,898 INFO
> >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline
> >> checkpoint 1 by task 0cb03590fdf18027206ef628b3ef5863 of job
> >> c41f4811262db1c4c270b136571c8201 at
> >> container_e27_1591466310139_21670_01_06 @
> >> hdp1-hadoop-datanode-4.novalocal (dataPort=44778).
> >> 2020-06-10 12:04:47,899 INFO
> >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Discarding
> >> checkpoint 1 of job c41f4811262db1c4c270b136571c8201.
> >> org.apache.flink.runtime.checkpoint.CheckpointException: Could not
> >> complete snapshot 1 for operator Source: Custom Source -> Map ->
> Source_Map
> >> -> Empty_Filer -> Field_Filter -> Type_Filter -> Value_Filter -> Map ->
> Map
> >> -> Map -> Sink: Unnamed (7/12). Failure reason: Checkpoint was declined.
> >> at
> >>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:434)
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1420)
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1354)
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:991)
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:887)
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:860)
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:793)
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$3(StreamTask.java:777)
> >> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)
> >> at
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
> >> at
> >>
> 

Re: 关于DataStreamUtils.reinterpretasKeyedStream的使用

2020-06-10 文章 Jark Wu
Hi,

你可以再用 DataStreamUtils.reinterpretAsKeyedStream(proStream, new
MyKeySelector2(groupKeys) ) 把它解释成 KeyedStream。
因为你的 flatmap 和 上游节点之间如果并发一样的话,运行时是会 chain 在一起的,所以key的分布没有变。

Best,
Jark

On Wed, 10 Jun 2020 at 21:15, Yichao Yang <1048262...@qq.com> wrote:

> Hi
>
>
> flatmap之后返回的本身就不是Keyedstream哈,keyby之后返回的才是keyedstream。
>
>
> Best,
> Yichao Yang
>
>
>
>
> --原始邮件--
> 发件人:"绘梦飘雪"<318666...@qq.com;
> 发送时间:2020年6月10日(星期三) 晚上7:18
> 收件人:"user-zh"
> 主题:关于DataStreamUtils.reinterpretasKeyedStream的使用
>
>
>
> hi
> nbsp; 我有这样一个场景,以多个相同的key.做keyby,
> DataStream resStream =nbsp; demoStream.keyBy(groupKeys)
> nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> .flatMap(new MyFlatmapFunction())
> nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> .keyBy(groupKeys)
> nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> .process(new MyProcessFunction())
> nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> .keyBy(groupKeys)
> nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> .timeWindow(Time.seconds(1))
> nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> .aggregate(new MyAggFunction())
> nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> .keyBy(groupKeys)
> nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> .timeWindow(Time.seconds(1))
> nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> .process(new MyKeyProcessFunction());
>
> 我想通过DataStreamUtils.reinterpretasKeyedStream来改写这个流程,我自己尝试这样写下,,
> int[] groupKeys = new int[]{0,2,3};
>
> DataStream proStream =
> DataStreamUtils.reinterpretAsKeyedStream(demoStream, new
> MyKeySelector2(groupKeys) ) // MyKeySelector2
> 自己实现keySelector
> .flatMap(new MyFlatmapFunction())
> 我这样写发现数据流经过flatmap后并不是返回一个keyedstream
> ,请问是哪里有问题吗?请问该如何使用DataStreamUtils.reinterpretasKeyedStream


Re: kafka相关问题

2020-06-10 文章 Jark Wu
Hi,

我的理解是你想要获取 kafka 里面的最新一条数据,然后就结束?
类似于 kafka 的命令?
./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic xxx
--max-messages 1

在 Flink 里面表达出来就是 select * from kafka limit 1 的批处理结果,只不过现在这个 query
会一直运行(流模式),不会结束。

Best,
Jarrk

On Wed, 10 Jun 2020 at 21:44, Mikey <359502...@qq.com> wrote:

> hi,可以取最新的一条数据:
>
> select id, last_value(value) over (partition by id order by id range
> between 1 prodding and current row ) as cur_value  from table_ddl
> 通过分组分组获取最新的一条数据。
> 具体可参考:
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html
> <
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html
> >
>
> > 在 2020年6月10日,下午3:18,方盛凯  写道:
> >
> > 我认为创建一张表并不会触发从source读取的操作,除非你使用select * from table_ddl这类的操作。
> > 至于表的增大,显然是的。 我觉得你可能关注的是否会flush数据进入存储系统,保证不会out-of-memory这类的问题吗?
> > 我个人猜可能有两种方案:
> >
> 1.考虑到kafka是一种已经实现持久化的数据源,因此我们并非需要将所有的数据缓存到保存到本地,可以只记录读取到的数据位置,下次读取从记录的位置开始;
> > 2.定期向文件系统写入数据。
> >
> >
> > 小学生 <201782...@qq.com> 于2020年6月10日周三 下午2:48写道:
> >
> >> 各位大佬好,请教一个问题:
> >> 利用kafka作为流式源,注册内部的flink的动态表,如下面的table_ddl,是否由于'update-mode' =
> >>
> 'append',所以随着kafka消息逐步的推送,会导致table_ddl这个表的数据不断在增大,但是我想要的是kafka每推送一条数据,table_ddl就会相应的是这一条数据,请问这个如何实现呢?(或者有没有啥办法,可以直接从table_ddl中取最新的kafka数据呢)。
> >>
> >>
> >> table_ddl = """
> >> CREATE TABLE table_ddl(
> >> trck_id VARCHAR
> >> ) WITH (
> >> 'connector.type' = 'kafka',
> >> 'connector.version' = 'universal', 
> >> 'connector.topic' = 'w', 
> >> 'connector.startup-mode' = 'group-offsets',
> >> 'connector.properties.group.id' = 'trck_w',
> >> 'update-mode' = 'append',
> >> 'connector.properties.zookeeper.connect' = '*',
> >> 'connector.properties.bootstrap.servers' = '%#',
> >> 'format.type' = 'json',  
> >> 'format.derive-schema' = 'true'
> >> )
> >> """
>
>


Re: kafka相关问题

2020-06-10 文章 Mikey
hi,可以取最新的一条数据:

select id, last_value(value) over (partition by id order by id range between 1 
prodding and current row ) as cur_value  from table_ddl 
通过分组分组获取最新的一条数据。
具体可参考:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html
 


> 在 2020年6月10日,下午3:18,方盛凯  写道:
> 
> 我认为创建一张表并不会触发从source读取的操作,除非你使用select * from table_ddl这类的操作。
> 至于表的增大,显然是的。 我觉得你可能关注的是否会flush数据进入存储系统,保证不会out-of-memory这类的问题吗?
> 我个人猜可能有两种方案:
> 1.考虑到kafka是一种已经实现持久化的数据源,因此我们并非需要将所有的数据缓存到保存到本地,可以只记录读取到的数据位置,下次读取从记录的位置开始;
> 2.定期向文件系统写入数据。
> 
> 
> 小学生 <201782...@qq.com> 于2020年6月10日周三 下午2:48写道:
> 
>> 各位大佬好,请教一个问题:
>> 利用kafka作为流式源,注册内部的flink的动态表,如下面的table_ddl,是否由于'update-mode' =
>> 'append',所以随着kafka消息逐步的推送,会导致table_ddl这个表的数据不断在增大,但是我想要的是kafka每推送一条数据,table_ddl就会相应的是这一条数据,请问这个如何实现呢?(或者有没有啥办法,可以直接从table_ddl中取最新的kafka数据呢)。
>> 
>> 
>> table_ddl = """
>> CREATE TABLE table_ddl(
>> trck_id VARCHAR
>> ) WITH (
>> 'connector.type' = 'kafka',
>> 'connector.version' = 'universal', 
>> 'connector.topic' = 'w', 
>> 'connector.startup-mode' = 'group-offsets',
>> 'connector.properties.group.id' = 'trck_w',
>> 'update-mode' = 'append',
>> 'connector.properties.zookeeper.connect' = '*',
>> 'connector.properties.bootstrap.servers' = '%#',
>> 'format.type' = 'json',  
>> 'format.derive-schema' = 'true'
>> )
>> """



Re: kafka????????

2020-06-10 文章 ??????
kafka

??????????DataStreamUtils.reinterpretasKeyedStream??????

2020-06-10 文章 Yichao Yang
Hi


flatmapKeyedstreamkeyby??keyedstream??


Best,
Yichao Yang




----
??:""<318666...@qq.com;
:2020??6??10??(??) 7:18
??:"user-zh"

Re: 关于flinksql between问题

2020-06-10 文章 Leonard Xu
> 
> 非常感谢,使用的flink1.10.0 中流转成表是是否有字段的数目的限制 我把流转成表 我的流是一个封装的实体类

转换的时候没有这个字段数目的限制的,另外看你的字段也不是很多,一般业务上几百个字段都正常的,你检查下你字段的对应关系

祝好,
Leonard Xu


> tnv.registerDataStream("sqlStream",mysqlinst,'start_ip,'end_ip,'area_id,'unit_id,'device_id)
>   
> tnv.registerDataStream("OMstream",value,'original_network_id,'asset_id,'types,'d_ip,'d_port,'s_ip,'s_port,'devip,'url,'common_des,'operation_des,'raw_log,'severity,'happen_time,'create_time,'s_ip_num,
>  
> 'd_ip_num,'method,'asset_area_id,'device_id,'s_mac,'d_mac,'scope,'dcope,'s_asset_id,'d_asset_id,'asset_unit_id,'area_id,'unit_id,'enterprise_id)
>   tnv.registerFunction("ip_to_num",IPtoNum)
> 
> 在转成表时 如下错误
> log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig 
>  for more info.
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> Too many fields referenced from an atomic type.
>   at 
> org.apache.flink.table.typeutils.FieldInfoUtils.extractFieldInfoFromAtomicType(FieldInfoUtils.java:388)
>   at 
> org.apache.flink.table.typeutils.FieldInfoUtils.extractFieldInformation(FieldInfoUtils.java:259)
>   at 
> org.apache.flink.table.typeutils.FieldInfoUtils.getFieldsInfo(FieldInfoUtils.java:227)
>   at 
> org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$$anonfun$1.apply(StreamTableEnvironmentImpl.scala:237)
>   at 
> org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$$anonfun$1.apply(StreamTableEnvironmentImpl.scala:236)
>   at scala.Option.map(Option.scala:146)
>   at 
> org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.asQueryOperation(StreamTableEnvironmentImpl.scala:236)
>   at 
> org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.fromDataStream(StreamTableEnvironmentImpl.scala:81)
>   at 
> org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.registerDataStream(StreamTableEnvironmentImpl.scala:94)
>   at 
> com.jwell56.networksyslog.jobs.jobsource.MysqlLogToES$.main(MysqlLogToES.scala:77)
>   at 
> com.jwell56.networksyslog.jobs.jobsource.MysqlLogToES.main(MysqlLogToES.scala)
> 
> 
> 
> 
> 
> --原始邮件--
> 发件人:"Leonard Xu"mailto:xbjt...@gmail.com>;
> 发送时间:2020年6月10日(星期三) 中午1:16
> 收件人:"user-zh" ;
> 
> 主题:Re: 关于flinksql between问题
> 
> 
> 
> Hi,
> 
> 看你描述的想要的是自定义source(左表), 需要同一张mysql 
> 维表做join,如果是这样的话,你的sql看起来有点问题,目前的写法是regular join, 维表join的语法[1]:
> 
> SELECT
>  o.amout, o.currency, r.rate, o.amount * r.rate
> FROM
>  Orders AS o
>  JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
>  ON r.currency = o.currency
> 另外JDBC connector是实现了LookupSource的,也就是支持做维表,维表的更新的 connector.lookup.cache.ttl 
> 参数控制维表中cache的过期时间,不知道是否满足你的需求。
> 
> Best,
> Leonard Xu
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/sql/queries.html#joins
>  
>   
> ;
> [2] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/connect.html#jdbc-connector
>  
> 
>  
>   
> ;
> 
>  在 2020年6月10日,10:43,小屁孩 <932460...@qq.com  
> 写道:
>  
>  hi,感谢指导 已经可以实现,可以再问一下 如果我join的是一个mysql维表 
> 我是自定义的source定时更新mysql表这样跟流表关联是可以的吗?
>  
>  
>  
>  
>  --nbsp;原始邮件nbsp;--
>  发件人:nbsp;"wangweigu...@stevegame.cn 
> " gt;;
>  发送时间:nbsp;2020年6月9日(星期二) 晚上6:35
>  收件人:nbsp;"user-zh" gt;;
>  
>  主题:nbsp;回复: 回复: 关于flinksql between问题
>  
>  
>  
>  
>  nbsp; 我在1.10中用 useBlinkPlanner是可以的,用useOldPlanner是不可以的!
>  nbsp; 
>  会报你下面的错误:
>  nbsp; Exception in thread "main" 
> org.apache.flink.table.api.TableException: Cannot generate a valid execution 
> plan for the given query: 
>  
>  LogicalProject(num=[$0])
>  nbsp; LogicalJoin(condition=[AND(gt;($0, $1), <($0, $2))], 
> joinType=[inner])
>  nbsp;nbsp;nbsp; FlinkLogicalDataStreamScan(id=[1], 
> fields=[num])
>  nbsp;nbsp;nbsp; FlinkLogicalDataStreamScan(id=[2], 
> fields=[startNum, endNum])
>  
>  This exception indicates that the query uses an unsupported SQL feature.
>  
>  
>  
>  
>  nbsp;
>  发件人: 小屁孩
>  发送时间: 2020-06-09 17:41
>  收件人: user-zh
>  主题: 回复: 关于flinksql between问题
>  hi,我使用的是amp;nbsp;
>  1 flink1.9.0
>  2 oldplanner
>nbsp;
>  nbsp;
>nbsp;
>  3 

回复:kafka相关问题

2020-06-10 文章 kcz
你这个表达,实时kafka的一条记录,你要最新的那个是吧,你最新的判断标准是什么?根据什么特性来,表达清楚一点哇。





-- 原始邮件 --
发件人: 小学生 <201782...@qq.com
发送时间: 2020年6月10日 18:15
收件人: user-zh 

Re:flink??????????????????

2020-06-10 文章 Matt Wang
kafka ?? 0.11.0 ?? flink  EXACTLY-ONCE?? 
send ?? kafka commit 
?? kafka 
??  isolation.level 
 commit 


---
Best,
Matt Wang


On 06/10/2020 14:28??Yichao Yang<1048262...@qq.com> wrote??
Hi


sink 
??kafkakafka1.0??kafkaEXACTLY-ONCE


Best,
Yichao Yang




----
??:"??"

关于DataStreamUtils.reinterpretasKeyedStream的使用

2020-06-10 文章 绘梦飘雪
hi
 我有这样一个场景,以多个相同的key.做keyby,
DataStream resStream = demoStream.keyBy(groupKeys)
  .flatMap(new MyFlatmapFunction())
  .keyBy(groupKeys)
  .process(new MyProcessFunction())
  .keyBy(groupKeys)
  .timeWindow(Time.seconds(1))
  .aggregate(new MyAggFunction())
  .keyBy(groupKeys)
  .timeWindow(Time.seconds(1))
  .process(new MyKeyProcessFunction());

我想通过DataStreamUtils.reinterpretasKeyedStream来改写这个流程,我自己尝试这样写下,,
int[] groupKeys = new int[]{0,2,3};

DataStream proStream = DataStreamUtils.reinterpretAsKeyedStream(demoStream, new 
MyKeySelector2(groupKeys) )  //  MyKeySelector2 自己实现keySelector 
.flatMap(new MyFlatmapFunction())
我这样写发现数据流经过flatmap后并不是返回一个keyedstream
,请问是哪里有问题吗?请问该如何使用DataStreamUtils.reinterpretasKeyedStream

Re: FLINK SQL文档示例是否正确

2020-06-10 文章 Jark Wu
Good catch! 在 Flink 中需要用 mod(a, 4) 做取余运算。 % 不是一个 SQL 标准操作符。

我开了个 issue 去跟进这个问题:要么改文档,要么允许 % 操作符。

https://issues.apache.org/jira/browse/FLINK-18240

Best,
Jark

On Wed, 10 Jun 2020 at 18:34, 张韩  wrote:

> 问题:
> 文档(
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#scan-projection-and-filter)
> 使用'%'求余运算,在SQL解析报错:
> org.apache.calcite.sql.parser.SqlParseException: Percent remainder '%' is
> not allowed under the current SQL conformance level
>
>
> MYSQL conformance 支持'%'运算,使用MYSQL
> CONFORMANCE,在SQL语法校验过程会找不到'%'对应的SqlOperator
>
>
>
>
>
>


FLINK SQL文档示例是否正确

2020-06-10 文章 张韩
问题:
文档(https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#scan-projection-and-filter)使用'%'求余运算,在SQL解析报错:
org.apache.calcite.sql.parser.SqlParseException: Percent remainder '%' is not 
allowed under the current SQL conformance level


MYSQL conformance 支持'%'运算,使用MYSQL CONFORMANCE,在SQL语法校验过程会找不到'%'对应的SqlOperator







Re: kafka????????

2020-06-10 文章 ??????
??kafkamysql??where??kafka

Re: kafka相关问题

2020-06-10 文章 Jingsong Li
Hi, 小学生

你可以仔细描述下你的业务场景吗?然后再描述下问题,没懂到底是想要什么。

Best,
Jingsong Lee

On Wed, Jun 10, 2020 at 3:46 PM 方盛凯  wrote:

> 那你可以调整sql语句,例如使用limit关键字, topN 或使用自定义的方法,具体的你可以根据你的sql语句不断调整.
> 如有错误欢迎指正
>
> 小学生 <201782...@qq.com> 于2020年6月10日周三 下午3:26写道:
>
> > 您好,我是通过select * from
> > table_ddl这个去触发的,但是就是因为table_ddl的不断增大,脱离我的业务需要,我业务需要的就是触发select * from
> > table_ddl这个时候,只取最新的一条数据(就是保证table_ddl表总只有一条数据)
>


-- 
Best, Jingsong Lee


Re: flink任务checkpoint无法完成snapshot,且报kafka异常

2020-06-10 文章 李奇
哈喽,根据我自己遇到checkpoint失败,一般是因为你数据有问题,导致算子失败,有可能是数据格式,或者字段类型不匹配,字段数量等相关的原因造成,我看你补充的内容,好像是你kafka数据有问题样,你可以往这个方向看看数据是否正常。解析是否正确。

> 在 2020年6月10日,下午1:24,Zhefu PENG  写道:
> 
> 补充一下,在TaskManager发现了如下错误日志:
> 
> 2020-06-10 12:44:40,688 ERROR
> org.apache.flink.streaming.runtime.tasks.StreamTask   - Error
> during disposal of stream operator.
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to
> send data to Kafka: Pending record count must be zero at this point: 5
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1218)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:861)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:668)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:579)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:481)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.IllegalStateException: Pending record count must be
> zero at this point: 5
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:969)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:834)
> ... 8 more
> 
> 希望得到帮助,感谢!
> 
> 
> Zhefu PENG  于2020年6月10日周三 下午1:03写道:
> 
>> Hi all,
>> 
>> 现在有一个简单的flink任务,大概chain在一起后的执行图为:
>> Source: Custom Source -> Map -> Source_Map -> Empty_Filer -> Field_Filter
>> -> Type_Filter -> Value_Filter -> Map -> Map -> Map -> Sink: Unnamed
>> 
>> 
>> 但是在上线一段时间后,开始报错,日志中有说到无法完成checkpoint,还提到有kafka的网络和连接异常。但还有别的flink任务在相同的broker上进行数据的读写,并且没有报错。我们暂时定位在,有可能每个checkpoint的完成时间比较长,需要几百毫秒,我们设的时间间隔又比较短,只有一秒,可能是这部分影响到了任务的性能。但是这只是一个不太靠谱的猜想,现在也没有什么排查的切入点,想看看大家有没有一些看法或者建议意见,非常感谢。
>> 
>> 部分报错信息如下:
>> 2020-06-10 12:02:49,083 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
>> checkpoint 1 @ 1591761769060 for job c41f4811262db1c4c270b136571c8201.
>> 2020-06-10 12:04:47,898 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline
>> checkpoint 1 by task 0cb03590fdf18027206ef628b3ef5863 of job
>> c41f4811262db1c4c270b136571c8201 at
>> container_e27_1591466310139_21670_01_06 @
>> hdp1-hadoop-datanode-4.novalocal (dataPort=44778).
>> 2020-06-10 12:04:47,899 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding
>> checkpoint 1 of job c41f4811262db1c4c270b136571c8201.
>> org.apache.flink.runtime.checkpoint.CheckpointException: Could not
>> complete snapshot 1 for operator Source: Custom Source -> Map -> Source_Map
>> -> Empty_Filer -> Field_Filter -> Type_Filter -> Value_Filter -> Map -> Map
>> -> Map -> Sink: Unnamed (7/12). Failure reason: Checkpoint was declined.
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:434)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1420)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1354)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:991)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:887)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:860)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:793)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$3(StreamTask.java:777)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)
>> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
>> at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
>> at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
>> at 

Re: kafka????????

2020-06-10 文章 ??????
??table_ddlselect * from 
table_ddl

Re: kafka相关问题

2020-06-10 文章 方盛凯
那你有没有尝试过修改connector中property中connector.startup-mode
设置为latest-offset,这样子每次从kafka读取都是读取最新的消息。
另外,我想问一下 你的sql是一直运行的吗?
我给的limit方案是一个upersert流。

小学生 <201782...@qq.com> 于2020年6月10日周三 下午5:31写道:

> limit 没有用呀。有没有切实可行的方案呢,pyflink下。


Re: sink mysql 失败

2020-06-10 文章 李奇
用户名密码没有设置。

> 在 2020年6月10日,下午5:42,Zhou Zach  写道:
> 
> 感谢回复!忘记设置用户名和密码了。。
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> At 2020-06-10 16:54:43, "wangweigu...@stevegame.cn" 
>  wrote:
>> 
>> Caused by: java.sql.SQLException: Access denied for user ''@'localhost' 
>> (using password: NO)
>> 得指定下有操作mysql这个表的权限账号了!
>> 
>> 
>> 
>> 发件人: Zhou Zach
>> 发送时间: 2020-06-10 16:32
>> 收件人: Flink user-zh mailing list
>> 主题: sink mysql 失败
>> SLF4J: Class path contains multiple SLF4J bindings.
>> 
>> SLF4J: Found binding in 
>> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> 
>> SLF4J: Found binding in 
>> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> 
>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
>> explanation.
>> 
>> SLF4J: Actual binding is of type 
>> [org.apache.logging.slf4j.Log4jLoggerFactory]
>> 
>> ERROR StatusLogger No log4j2 configuration file found. Using default 
>> configuration: logging only errors to the console.
>> 
>> Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without 
>> server's identity verification is not recommended. According to MySQL 
>> 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established 
>> by default if explicit option isn't set. For compliance with existing 
>> applications not using SSL the verifyServerCertificate property is set to 
>> 'false'. You need either to explicitly disable SSL by setting useSSL=false, 
>> or set useSSL=true and provide truststore for server certificate 
>> verification.
>> 
>> Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without 
>> server's identity verification is not recommended. According to MySQL 
>> 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established 
>> by default if explicit option isn't set. For compliance with existing 
>> applications not using SSL the verifyServerCertificate property is set to 
>> 'false'. You need either to explicitly disable SSL by setting useSSL=false, 
>> or set useSSL=true and provide truststore for server certificate 
>> verification.
>> 
>> Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without 
>> server's identity verification is not recommended. According to MySQL 
>> 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established 
>> by default if explicit option isn't set. For compliance with existing 
>> applications not using SSL the verifyServerCertificate property is set to 
>> 'false'. You need either to explicitly disable SSL by setting useSSL=false, 
>> or set useSSL=true and provide truststore for server certificate 
>> verification.
>> 
>> Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without 
>> server's identity verification is not recommended. According to MySQL 
>> 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established 
>> by default if explicit option isn't set. For compliance with existing 
>> applications not using SSL the verifyServerCertificate property is set to 
>> 'false'. You need either to explicitly disable SSL by setting useSSL=false, 
>> or set useSSL=true and provide truststore for server certificate 
>> verification.
>> 
>> Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without 
>> server's identity verification is not recommended. According to MySQL 
>> 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established 
>> by default if explicit option isn't set. For compliance with existing 
>> applications not using SSL the verifyServerCertificate property is set to 
>> 'false'. You need either to explicitly disable SSL by setting useSSL=false, 
>> or set useSSL=true and provide truststore for server certificate 
>> verification.
>> 
>> Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without 
>> server's identity verification is not recommended. According to MySQL 
>> 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established 
>> by default if explicit option isn't set. For compliance with existing 
>> applications not using SSL the verifyServerCertificate property is set to 
>> 'false'. You need either to explicitly disable SSL by setting useSSL=false, 
>> or set useSSL=true and provide truststore for server certificate 
>> verification.
>> 
>> Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without 
>> server's identity verification is not recommended. According to MySQL 
>> 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established 
>> by default if explicit option isn't set. For compliance with existing 
>> applications not using SSL the verifyServerCertificate property is set to 
>> 'false'. You need either to explicitly disable SSL by setting useSSL=false, 
>> or set useSSL=true and provide truststore for server certificate 

Re:回复: sink mysql 失败

2020-06-10 文章 Zhou Zach
感谢回复!忘记设置用户名和密码了。。

















At 2020-06-10 16:54:43, "wangweigu...@stevegame.cn"  
wrote:
>
>Caused by: java.sql.SQLException: Access denied for user ''@'localhost' (using 
>password: NO)
>得指定下有操作mysql这个表的权限账号了!
>
>
> 
>发件人: Zhou Zach
>发送时间: 2020-06-10 16:32
>收件人: Flink user-zh mailing list
>主题: sink mysql 失败
>SLF4J: Class path contains multiple SLF4J bindings.
> 
>SLF4J: Found binding in 
>[jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> 
>SLF4J: Found binding in 
>[jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> 
>SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
>explanation.
> 
>SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
> 
>ERROR StatusLogger No log4j2 configuration file found. Using default 
>configuration: logging only errors to the console.
> 
>Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without 
>server's identity verification is not recommended. According to MySQL 5.5.45+, 
>5.6.26+ and 5.7.6+ requirements SSL connection must be established by default 
>if explicit option isn't set. For compliance with existing applications not 
>using SSL the verifyServerCertificate property is set to 'false'. You need 
>either to explicitly disable SSL by setting useSSL=false, or set useSSL=true 
>and provide truststore for server certificate verification.
> 
>Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without 
>server's identity verification is not recommended. According to MySQL 5.5.45+, 
>5.6.26+ and 5.7.6+ requirements SSL connection must be established by default 
>if explicit option isn't set. For compliance with existing applications not 
>using SSL the verifyServerCertificate property is set to 'false'. You need 
>either to explicitly disable SSL by setting useSSL=false, or set useSSL=true 
>and provide truststore for server certificate verification.
> 
>Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without 
>server's identity verification is not recommended. According to MySQL 5.5.45+, 
>5.6.26+ and 5.7.6+ requirements SSL connection must be established by default 
>if explicit option isn't set. For compliance with existing applications not 
>using SSL the verifyServerCertificate property is set to 'false'. You need 
>either to explicitly disable SSL by setting useSSL=false, or set useSSL=true 
>and provide truststore for server certificate verification.
> 
>Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without 
>server's identity verification is not recommended. According to MySQL 5.5.45+, 
>5.6.26+ and 5.7.6+ requirements SSL connection must be established by default 
>if explicit option isn't set. For compliance with existing applications not 
>using SSL the verifyServerCertificate property is set to 'false'. You need 
>either to explicitly disable SSL by setting useSSL=false, or set useSSL=true 
>and provide truststore for server certificate verification.
> 
>Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without 
>server's identity verification is not recommended. According to MySQL 5.5.45+, 
>5.6.26+ and 5.7.6+ requirements SSL connection must be established by default 
>if explicit option isn't set. For compliance with existing applications not 
>using SSL the verifyServerCertificate property is set to 'false'. You need 
>either to explicitly disable SSL by setting useSSL=false, or set useSSL=true 
>and provide truststore for server certificate verification.
> 
>Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without 
>server's identity verification is not recommended. According to MySQL 5.5.45+, 
>5.6.26+ and 5.7.6+ requirements SSL connection must be established by default 
>if explicit option isn't set. For compliance with existing applications not 
>using SSL the verifyServerCertificate property is set to 'false'. You need 
>either to explicitly disable SSL by setting useSSL=false, or set useSSL=true 
>and provide truststore for server certificate verification.
> 
>Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without 
>server's identity verification is not recommended. According to MySQL 5.5.45+, 
>5.6.26+ and 5.7.6+ requirements SSL connection must be established by default 
>if explicit option isn't set. For compliance with existing applications not 
>using SSL the verifyServerCertificate property is set to 'false'. You need 
>either to explicitly disable SSL by setting useSSL=false, or set useSSL=true 
>and provide truststore for server certificate verification.
> 
>Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without 
>server's identity verification is not recommended. According to MySQL 5.5.45+, 
>5.6.26+ and 5.7.6+ requirements SSL connection must be established by default 
>if explicit option 

Re: kafka????????

2020-06-10 文章 ??????
limit ??pyflink

Re: Flink sql 状态清理问题

2020-06-10 文章 Benchao Li
Hi,

Join算子的state是支持清理的。
可以提供下以下信息:
- Flink 版本
- planner (blink planner / old planner)

op <520075...@qq.com> 于2020年6月10日周三 下午4:08写道:

> hi,
> 写了个测试程序:
>
> ..
>
> val tConfig = bstEnv.getConfig
>
> confg.withIdleStateRetentionTime(Time.minutes(10),Time.minutes(25))
>
> ..
>
> val q1=bstEnv.sqlQuery(
>   """select createTime,feedid from source
> |where circleName is not null
> |and circleName not in('','_')
> |and action = 'C_FEED_EDIT_SEND'
> |""".stripMargin)
>
>
>  bstEnv.createTemporaryView("sourcefeed",q1)
> val q2=bstEnv.sqlQuery(
>   """select feedid,postfeedid,action from source
> |where circleName is not null
> |and circleName not in('','_')
> |and action in('C_PUBLISH','C_FORWARD_PUBLISH')
> |""".stripMargin)
>
> bstEnv.createTemporaryView("postfeed",q2)
> bstEnv.sqlQuery(
>   """
> |select count(b.postfeedid) from
> |sourcefeed a
> |join postfeed b
> |on a.feedid=b.postfeedid
>   """.stripMargin).toRetractStream[Row](confg).print("")
>
>
> //
>
> 程序里面设置了状态失效最晚时间是空闲25分钟,但是运行了几天了,我再web上观察到的状态一直再不断增加,可以确定关联的id最多只会活跃1个小时左右,请问是哪里没设置对还是join两边的state不支持清理?
>
>


Re: TTL 支不支持自然日

2020-06-10 文章 Jingsong Li
Hi,

我觉得可以有基于watermark的状态清理这种机制。

但是 SQL的语义不太好描述这种机制,所以业务上能不能算出一个day来加上?比如group by的字段加上这个day,这样可以隔天后数据独立?

Best,
Jingsong Lee

On Wed, Jun 10, 2020 at 3:12 PM star <3149768...@qq.com> wrote:

> 感谢您的建议,实时和离线的sink的目标表是一样的。
> 举个场景:
> 比如计算用户购买商品的种类数量(select count(distinct product) from
> order),这种计算需要基于历史数据,就是要把用户的历史订单都拿到实时来计算。
> 用户量有2亿,每天日活只有不到100w,为了节省计算资源只计算当天有过购买行为的用户,所以到了凌晨把状态清0,从新计算今天的用户。
>
>
>
>
>
>
>
>
> --原始邮件--
> 发件人:"Yichao Yang"<1048262...@qq.com;
> 发送时间:2020年6月10日(星期三) 下午2:45
> 收件人:"user-zh"
> 主题:回复:TTL 支不支持自然日
>
>
>
> Hi
>
>
> 根据你的场景来看,你们是每天0点之后会用离线任务再去覆盖实时的数据,所以我理解等批结束之后,你们的实时任务会从0点开始重新消费数据?
>
> 你的担心是说0点过后状态没有清理,那么我理解你用到的状态是自定义的状态并且和时间属性有关,那么其实可以做的一个操作就是你可以判断当前数据的时间戳和状态中存储数据的时间戳,如果不是同一天那么把状态清空即可,这样就做到了自定义状态每天0点自动清除。
>
>
>
> 我不知道你们的场景以及技术选型,所以我不太明白你们为什么会为了让离线任务覆盖实时计算结果而去把实时任务停止,因为我的理解是离线和实时不冲突,应该是互补的,并且他们的sink应该不同?
>
>
> Best,
> Yichao Yang
>
>
>
>
> --nbsp;原始邮件nbsp;--
> 发件人:nbsp;"star"<3149768...@qq.comgt;;
> 发送时间:nbsp;2020年6月10日(星期三) 下午2:34
> 收件人:nbsp;"user-zh@flink.apache.org"
> 主题:nbsp;TTL 支不支持自然日
>
>
>
> 您好,
> 可以不可以按照自然日来清理状态,我们的离线任务每天零点后会跑批覆盖实时计算的结果,实时任务每天凌晨也会停止,等跑批结束后再启动。有没有根据具体时间来清理状态的功能,比如每天到0:00自动清理昨天到状态,我就不用每天重启作业了。



-- 
Best, Jingsong Lee


回复: sink mysql 失败

2020-06-10 文章 wangweigu...@stevegame.cn

Caused by: java.sql.SQLException: Access denied for user ''@'localhost' (using 
password: NO)
得指定下有操作mysql这个表的权限账号了!


 
发件人: Zhou Zach
发送时间: 2020-06-10 16:32
收件人: Flink user-zh mailing list
主题: sink mysql 失败
SLF4J: Class path contains multiple SLF4J bindings.
 
SLF4J: Found binding in 
[jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
 
SLF4J: Found binding in 
[jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
 
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
 
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
 
ERROR StatusLogger No log4j2 configuration file found. Using default 
configuration: logging only errors to the console.
 
Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without server's 
identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ 
and 5.7.6+ requirements SSL connection must be established by default if 
explicit option isn't set. For compliance with existing applications not using 
SSL the verifyServerCertificate property is set to 'false'. You need either to 
explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide 
truststore for server certificate verification.
 
Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without server's 
identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ 
and 5.7.6+ requirements SSL connection must be established by default if 
explicit option isn't set. For compliance with existing applications not using 
SSL the verifyServerCertificate property is set to 'false'. You need either to 
explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide 
truststore for server certificate verification.
 
Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without server's 
identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ 
and 5.7.6+ requirements SSL connection must be established by default if 
explicit option isn't set. For compliance with existing applications not using 
SSL the verifyServerCertificate property is set to 'false'. You need either to 
explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide 
truststore for server certificate verification.
 
Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without server's 
identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ 
and 5.7.6+ requirements SSL connection must be established by default if 
explicit option isn't set. For compliance with existing applications not using 
SSL the verifyServerCertificate property is set to 'false'. You need either to 
explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide 
truststore for server certificate verification.
 
Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without server's 
identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ 
and 5.7.6+ requirements SSL connection must be established by default if 
explicit option isn't set. For compliance with existing applications not using 
SSL the verifyServerCertificate property is set to 'false'. You need either to 
explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide 
truststore for server certificate verification.
 
Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without server's 
identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ 
and 5.7.6+ requirements SSL connection must be established by default if 
explicit option isn't set. For compliance with existing applications not using 
SSL the verifyServerCertificate property is set to 'false'. You need either to 
explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide 
truststore for server certificate verification.
 
Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without server's 
identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ 
and 5.7.6+ requirements SSL connection must be established by default if 
explicit option isn't set. For compliance with existing applications not using 
SSL the verifyServerCertificate property is set to 'false'. You need either to 
explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide 
truststore for server certificate verification.
 
Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without server's 
identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ 
and 5.7.6+ requirements SSL connection must be established by default if 
explicit option isn't set. For compliance with existing applications not using 
SSL the verifyServerCertificate property is set to 'false'. You need either to 
explicitly disable SSL by setting 

??????flink??????????????????

2020-06-10 文章 1193216154
read uncommit ??read commint.
read uncommit ??flink??commit??
read commit??




----
??:"??"

sink mysql 失败

2020-06-10 文章 Zhou Zach
SLF4J: Class path contains multiple SLF4J bindings.

SLF4J: Found binding in 
[jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: Found binding in 
[jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.

SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]

ERROR StatusLogger No log4j2 configuration file found. Using default 
configuration: logging only errors to the console.

Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without server's 
identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ 
and 5.7.6+ requirements SSL connection must be established by default if 
explicit option isn't set. For compliance with existing applications not using 
SSL the verifyServerCertificate property is set to 'false'. You need either to 
explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide 
truststore for server certificate verification.

Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without server's 
identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ 
and 5.7.6+ requirements SSL connection must be established by default if 
explicit option isn't set. For compliance with existing applications not using 
SSL the verifyServerCertificate property is set to 'false'. You need either to 
explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide 
truststore for server certificate verification.

Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without server's 
identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ 
and 5.7.6+ requirements SSL connection must be established by default if 
explicit option isn't set. For compliance with existing applications not using 
SSL the verifyServerCertificate property is set to 'false'. You need either to 
explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide 
truststore for server certificate verification.

Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without server's 
identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ 
and 5.7.6+ requirements SSL connection must be established by default if 
explicit option isn't set. For compliance with existing applications not using 
SSL the verifyServerCertificate property is set to 'false'. You need either to 
explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide 
truststore for server certificate verification.

Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without server's 
identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ 
and 5.7.6+ requirements SSL connection must be established by default if 
explicit option isn't set. For compliance with existing applications not using 
SSL the verifyServerCertificate property is set to 'false'. You need either to 
explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide 
truststore for server certificate verification.

Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without server's 
identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ 
and 5.7.6+ requirements SSL connection must be established by default if 
explicit option isn't set. For compliance with existing applications not using 
SSL the verifyServerCertificate property is set to 'false'. You need either to 
explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide 
truststore for server certificate verification.

Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without server's 
identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ 
and 5.7.6+ requirements SSL connection must be established by default if 
explicit option isn't set. For compliance with existing applications not using 
SSL the verifyServerCertificate property is set to 'false'. You need either to 
explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide 
truststore for server certificate verification.

Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without server's 
identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ 
and 5.7.6+ requirements SSL connection must be established by default if 
explicit option isn't set. For compliance with existing applications not using 
SSL the verifyServerCertificate property is set to 'false'. You need either to 
explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide 
truststore for server certificate verification.

Exception in thread "main" java.util.concurrent.ExecutionException: 
org.apache.flink.client.program.ProgramInvocationException: Job 

????: ?????? ????flinksql between????

2020-06-10 文章 wangweigu...@stevegame.cn

  ??valuemysqlinst


 
 ??
?? 2020-06-10 15:25
 user-zh
?? ?? flinksql between
flink1.10.0 ??  

tnv.registerDataStream("sqlStream",mysqlinst,'start_ip,'end_ip,'area_id,'unit_id,'device_id)
  
tnv.registerDataStream("OMstream",value,'original_network_id,'asset_id,'types,'d_ip,'d_port,'s_ip,'s_port,'devip,'url,'common_des,'operation_des,'raw_log,'severity,'happen_time,'create_time,'s_ip_num,
 
'd_ip_num,'method,'asset_area_id,'device_id,'s_mac,'d_mac,'scope,'dcope,'s_asset_id,'d_asset_id,'asset_unit_id,'area_id,'unit_id,'enterprise_id)
  tnv.registerFunction("ip_to_num",IPtoNum)
 
?? 
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more 
info.
Exception in thread "main" org.apache.flink.table.api.ValidationException: Too 
many fields referenced from an atomic type.
at 
org.apache.flink.table.typeutils.FieldInfoUtils.extractFieldInfoFromAtomicType(FieldInfoUtils.java:388)
at 
org.apache.flink.table.typeutils.FieldInfoUtils.extractFieldInformation(FieldInfoUtils.java:259)
at 
org.apache.flink.table.typeutils.FieldInfoUtils.getFieldsInfo(FieldInfoUtils.java:227)
at 
org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$$anonfun$1.apply(StreamTableEnvironmentImpl.scala:237)
at 
org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$$anonfun$1.apply(StreamTableEnvironmentImpl.scala:236)
at scala.Option.map(Option.scala:146)
at 
org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.asQueryOperation(StreamTableEnvironmentImpl.scala:236)
at 
org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.fromDataStream(StreamTableEnvironmentImpl.scala:81)
at 
org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.registerDataStream(StreamTableEnvironmentImpl.scala:94)
at 
com.jwell56.networksyslog.jobs.jobsource.MysqlLogToES$.main(MysqlLogToES.scala:77)
at 
com.jwell56.networksyslog.jobs.jobsource.MysqlLogToES.main(MysqlLogToES.scala)
 
 
 
 
 
----
??:"Leonard Xu"https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/sql/queries.html#joins
 


Flink sql ????????????

2020-06-10 文章 op
hi??



..
val tConfig = 
bstEnv.getConfigconfg.withIdleStateRetentionTime(Time.minutes(10),Time.minutes(25))..val
 q1=bstEnv.sqlQuery(
  """select createTime,feedid from source
|where circleName is not null
|and circleName not in('','_')
|and action = 'C_FEED_EDIT_SEND'
|""".stripMargin)
 bstEnv.createTemporaryView("sourcefeed",q1)
val q2=bstEnv.sqlQuery(
  """select feedid,postfeedid,action from source
|where circleName is not null
|and circleName not in('','_')
|and action in('C_PUBLISH','C_FORWARD_PUBLISH')
|""".stripMargin)

bstEnv.createTemporaryView("postfeed",q2)
bstEnv.sqlQuery(
  """
|select count(b.postfeedid) from
|sourcefeed a
|join postfeed b
|on a.feedid=b.postfeedid
  """.stripMargin).toRetractStream[Row](confg).print("")
//25webid1??join??state

Re: kafka相关问题

2020-06-10 文章 方盛凯
那你可以调整sql语句,例如使用limit关键字, topN 或使用自定义的方法,具体的你可以根据你的sql语句不断调整.
如有错误欢迎指正

小学生 <201782...@qq.com> 于2020年6月10日周三 下午3:26写道:

> 您好,我是通过select * from
> table_ddl这个去触发的,但是就是因为table_ddl的不断增大,脱离我的业务需要,我业务需要的就是触发select * from
> table_ddl这个时候,只取最新的一条数据(就是保证table_ddl表总只有一条数据)


?????? ????flinksql between????

2020-06-10 文章 ??????
flink1.10.0 ??  

tnv.registerDataStream("sqlStream",mysqlinst,'start_ip,'end_ip,'area_id,'unit_id,'device_id)
  
tnv.registerDataStream("OMstream",value,'original_network_id,'asset_id,'types,'d_ip,'d_port,'s_ip,'s_port,'devip,'url,'common_des,'operation_des,'raw_log,'severity,'happen_time,'create_time,'s_ip_num,
 
'd_ip_num,'method,'asset_area_id,'device_id,'s_mac,'d_mac,'scope,'dcope,'s_asset_id,'d_asset_id,'asset_unit_id,'area_id,'unit_id,'enterprise_id)
  tnv.registerFunction("ip_to_num",IPtoNum)

?? 
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more 
info.
Exception in thread "main" org.apache.flink.table.api.ValidationException: Too 
many fields referenced from an atomic type.
at 
org.apache.flink.table.typeutils.FieldInfoUtils.extractFieldInfoFromAtomicType(FieldInfoUtils.java:388)
at 
org.apache.flink.table.typeutils.FieldInfoUtils.extractFieldInformation(FieldInfoUtils.java:259)
at 
org.apache.flink.table.typeutils.FieldInfoUtils.getFieldsInfo(FieldInfoUtils.java:227)
at 
org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$$anonfun$1.apply(StreamTableEnvironmentImpl.scala:237)
at 
org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$$anonfun$1.apply(StreamTableEnvironmentImpl.scala:236)
at scala.Option.map(Option.scala:146)
at 
org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.asQueryOperation(StreamTableEnvironmentImpl.scala:236)
at 
org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.fromDataStream(StreamTableEnvironmentImpl.scala:81)
at 
org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.registerDataStream(StreamTableEnvironmentImpl.scala:94)
at 
com.jwell56.networksyslog.jobs.jobsource.MysqlLogToES$.main(MysqlLogToES.scala:77)
at 
com.jwell56.networksyslog.jobs.jobsource.MysqlLogToES.main(MysqlLogToES.scala)





----
??:"Leonard Xu"https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/sql/queries.html#joins
 


Re: kafka????????

2020-06-10 文章 ??????
??select * from 
table_ddl??table_ddl??select
 * from 
table_ddl??table_ddl??

Re: kafka相关问题

2020-06-10 文章 方盛凯
我认为创建一张表并不会触发从source读取的操作,除非你使用select * from table_ddl这类的操作。
至于表的增大,显然是的。 我觉得你可能关注的是否会flush数据进入存储系统,保证不会out-of-memory这类的问题吗?
我个人猜可能有两种方案:
1.考虑到kafka是一种已经实现持久化的数据源,因此我们并非需要将所有的数据缓存到保存到本地,可以只记录读取到的数据位置,下次读取从记录的位置开始;
2.定期向文件系统写入数据。


小学生 <201782...@qq.com> 于2020年6月10日周三 下午2:48写道:

> 各位大佬好,请教一个问题:
> 利用kafka作为流式源,注册内部的flink的动态表,如下面的table_ddl,是否由于'update-mode' =
> 'append',所以随着kafka消息逐步的推送,会导致table_ddl这个表的数据不断在增大,但是我想要的是kafka每推送一条数据,table_ddl就会相应的是这一条数据,请问这个如何实现呢?(或者有没有啥办法,可以直接从table_ddl中取最新的kafka数据呢)。
>
>
> table_ddl = """
> CREATE TABLE table_ddl(
> trck_id VARCHAR
> ) WITH (
> 'connector.type' = 'kafka',
> 'connector.version' = 'universal', 
> 'connector.topic' = 'w', 
> 'connector.startup-mode' = 'group-offsets',
> 'connector.properties.group.id' = 'trck_w',
> 'update-mode' = 'append',
> 'connector.properties.zookeeper.connect' = '*',
> 'connector.properties.bootstrap.servers' = '%#',
> 'format.type' = 'json',  
> 'format.derive-schema' = 'true'
> )
> """


??????TTL ??????????????

2020-06-10 文章 star
??sink??
??
select count(distinct product) from 
order),??
2100w0??








----
??:"Yichao Yang"<1048262...@qq.com;
:2020??6??10??(??) 2:45
??:"user-zh"

Re: Re: flink如何传递全局变量

2020-06-10 文章 zjfpla...@hotmail.com
我看到这篇文章介绍的比较详细:https://www.cnblogs.com/029zz010buct/p/10362451.html


zjfpla...@hotmail.com
 
发件人: Px New
发送时间: 2020-06-10 09:54
收件人: user-zh
主题: Re: flink如何传递全局变量
对
正如 -> 1048262223  所说的一样 , 目前我就是通过BroadCast 动态更细一些规则带到下游并在Process method 中
进行操作 | 
 
zjfpla...@hotmail.com  于2020年6月9日周二 下午8:14写道:
 
> hi,
> 请问flink如何传递全局变量,静态类好像服务器端运行不行。
> 场景是:一开始flink程序起来时,读取配置文件中的配置项,此类配置项可能会在sink,source等等其他地方用到,算是整个程序的全局配置
>
>
>
> zjfpla...@hotmail.com
>


kafka????????

2020-06-10 文章 ??????
??
kafka??flink??table_ddl??'update-mode'
 = 
'append'??kafka??table_ddl??kafkatable_ddl??table_ddl??kafka??


table_ddl = """
CREATE TABLE table_ddl(
trck_id VARCHAR
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal', 
'connector.topic' = 'w', 
'connector.startup-mode' = 'group-offsets',
'connector.properties.group.id' = 'trck_w',
'update-mode' = 'append',
'connector.properties.zookeeper.connect' = '*',
'connector.properties.bootstrap.servers' = '%#',
'format.type' = 'json',  
'format.derive-schema' = 'true'
)
"""

??????TTL ??????????????

2020-06-10 文章 Yichao Yang
Hi


00
00


sink?


Best,
Yichao Yang




----
??:"star"<3149768...@qq.com;
:2020??6??10??(??) 2:34
??:"user-zh@flink.apache.org"

TTL ??????????????

2020-06-10 文章 star
?? 
??0:00

??????flink??????????????????

2020-06-10 文章 Yichao Yang
Hi


sink 
??kafkakafka1.0??kafkaEXACTLY-ONCE


Best,
Yichao Yang




----
??:"??"

native kubernetes ClusterRoleBinding 过期问题咨询

2020-06-10 文章 a511955993

hi all

使用的版本是flink 1.10.1 ,kubernetes 版本 1.17

构建了一个session集群,也有正常赋权,可以正常提交作业并运行作业。隔一段时间后,重新提交作业会出现无法创建新的TM的现象。需要重新执行kubectl 
apply -f rbac.yaml 将账号和角色进行绑定后才可以正常创建TM。

对应的rbac.yaml如下


apiVersion: v1
kind: ServiceAccount
metadata:
 name: flink
 namespace: flink-collect-metric
---

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
 name: flink-role-binding
roleRef:
 apiGroup: rbac.authorization.k8s.io
 kind: ClusterRole
 name: edit
subjects:
- kind: ServiceAccount
 name: flink
 namespace: flink-collect-metric


报错信息如下:


2020-06-10 14:09:14,664 ERROR 
org.apache.flink.kubernetes.KubernetesResourceManager - Could not start 
TaskManager in pod flink-collect-metric-taskmanager-1-509.
java.util.concurrent.CompletionException: 
io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: POST 
at: https://10.96.0.1/api/v1/namespaces/flink-collect-metric/pods. Message: 
Forbidden!Configured service account doesn't have access. Service account may 
have been revoked. pods is forbidden: User 
"system:serviceaccount:flink-collect-metric:flink" cannot create resource 
"pods" in API group "" in the namespace "flink-collect-metric".
 at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
 at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
 at 
java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1643)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)
Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Failure 
executing: POST at: 
https://10.96.0.1/api/v1/namespaces/flink-collect-metric/pods. Message: 
Forbidden!Configured service account doesn't have access. Service account may 
have been revoked. pods is forbidden: User 
"system:serviceaccount:flink-collect-metric:flink" cannot create resource 
"pods" in API group "" in the namespace "flink-collect-metric".
 at 
io.fabric8.kubernetes.client.dsl.base.OperationSupport.requestFailure(OperationSupport.java:510)
 at 
io.fabric8.kubernetes.client.dsl.base.OperationSupport.assertResponseCode(OperationSupport.java:447)
 at 
io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:413)
 at 
io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:372)
 at 
io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleCreate(OperationSupport.java:241)
 at 
io.fabric8.kubernetes.client.dsl.base.BaseOperation.handleCreate(BaseOperation.java:798)
 at 
io.fabric8.kubernetes.client.dsl.base.BaseOperation.create(BaseOperation.java:328)
 at 
io.fabric8.kubernetes.client.dsl.base.BaseOperation.create(BaseOperation.java:324)
 at 
org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.lambda$createTaskManagerPod$0(Fabric8FlinkKubeClient.java:184)
 at 
java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640)
 ... 3 more

Looking forward to your reply and help.

Best
| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

flink??????????????????

2020-06-10 文章 ??????
Hi,
??flink??kafka??EXACTLY-ONCE??
??debug??invoketraction.producer.send()??precommit??commit
??

Re: flink如何传递全局变量

2020-06-10 文章 李杰
还有一个简单的方法,可以尝试parameterTool,当然类型偏简单。
final ParameterTool parameterTool = ParameterTool.fromArgs(args);
parameterTool.get("paramete-key","paramete-value");
...
env.getConfig().setGlobalJobParameters(parameterTool);
...
source
.map(new RichMapFunction>() {
private ParameterTool parameterTool = null;
private String paramete = "";

@Override
public void open(Configuration parameters) throws Exception {
parameterTool = (ParameterTool)
getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
paramete = parameterTool.get("paramete-key");
System.out.println("paramete = " + paramete);
}

@Override
public Tuple2 map(String value) throws Exception {
return new Tuple2<>(paramete, 1);
}
});
 ...

Px New <15701181132mr@gmail.com> 于2020年6月10日周三 上午9:53写道:

> 对
> 正如 -> 1048262223  所说的一样 , 目前我就是通过BroadCast 动态更细一些规则带到下游并在Process method 中
> 进行操作 | 
>
> zjfpla...@hotmail.com  于2020年6月9日周二 下午8:14写道:
>
> > hi,
> > 请问flink如何传递全局变量,静态类好像服务器端运行不行。
> >
>  场景是:一开始flink程序起来时,读取配置文件中的配置项,此类配置项可能会在sink,source等等其他地方用到,算是整个程序的全局配置
> >
> >
> >
> > zjfpla...@hotmail.com
> >
>