回复:本地测试 flink 1.10 hbase sql create table 在反序列化byte之后得到的对象conf配置为null,导致无法连接hbase集群

2020-05-22 Thread shao.hongxiao
没有,经过测试是可以写入,读取会出这个问题




| |
邵红晓
|
|
邮箱:17611022...@163.com
|

签名由 网易邮箱大师 定制

在2020年05月23日 08:20,Jacky Lau 写道:
你是否将hbase conf目录在configs. sh脚本中显示指定了


发自我的iPhone



-- 原始邮件 --
发件人: Leonard Xu 
发送时间: 2020年5月23日 00:19
收件人: shao.hongxiao <17611022...@163.com>
抄送: user-zh 
主题: 回复:本地测试 flink 1.10 hbase sql create table 
在反序列化byte之后得到的对象conf配置为null,导致无法连接hbase集群

Hi, hongxiao

我试了下,我本地hbase集群测试了下ok的[1],没能复现你的问题,你hbase集群的环境是怎么样的呀?

Best,
Leonard Xu
[1] 
https://github.com/leonardBang/flink-sql-etl/blob/master/etl-job/src/main/java/kafka2hbase/KafkaJoinHbaseJoinMysql2Hbase.java#L33
 


> 在 2020年5月22日,11:48,shao.hongxiao <17611022...@163.com> 写道:
>
> 一下是我的程序
> sql
> val hbaseTable =
>   """
> |CREATE TABLE pvuv_sink(
> |user_id varchar,
> |f ROW
> |) WITH (
> |'connector.type' = 'hbase',
> |'connector.version' = '1.4.3',
> |'connector.table-name' = 'test_shx',
> |'connector.zookeeper.quorum' = 'docker-hbase:2181',
> |'connector.zookeeper.znode.parent' = '/hbase'
> |)
>   """.stripMargin.toString
>
>  val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
> bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
>  bsEnv.setParallelism(1)
> val bsSettings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)
> bsTableEnv.sqlUpdate(hbaseTable)
> bsTableEnv.execute("SQL Job")
>
> 报错
> job graph 阶段
> HBaseRowInputFormat.java
> this.conf = {Configuration@4841} "Configuration: core-default.xml, 
> core-site.xml, hbase-default.xml, hbase-site.xml"
> quietmode = true
> allowNullValueProperties = false
> resources = {ArrayList@4859}  size = 2
> finalParameters = {Collections$SetFromMap@4860}  size = 0
> loadDefaults = true
> updatingResource = {ConcurrentHashMap@4861}  size = 343
> properties = {Properties@4862}  size = 343
> overlay = {Properties@4863}  size = 2
> classLoader = {Launcher$AppClassLoader@4864}
>
> Executor job 阶段  InstantiationUtil.javareadObjectFromConfig
> userCodeObject = {HBaseRowInputFormat@13658}
> tableName = "test_shx"
> schema = {HBaseTableSchema@13660}
> conf = null
> readHelper = null
> endReached = false
> table = null
> scan = null
> resultScanner = null
> currentRow = null
> scannedRows = 0
> runtimeContext = null
>
>
> 恳请各位大神相帮
>
>
> 邵红晓
> 邮箱:17611022...@163.com
>  
> 
> 签名由 网易邮箱大师  定制




回复:本地测试 flink 1.10 hbase sql create table 在反序列化byte之后得到的对象conf配置为null,导致无法连接hbase集群

2020-05-22 Thread shao.hongxiao
感谢,经过测试发现,hbase可以写成功,就是无法读取,查看源码发现
hbaseRowinputformatImpl大概是这个类下面,有一句话
private transient Configuration conf;

这样conf就不会被序列化,也就是反序列化的时候为null了的原因了,具体你可以再复现一下


| |
邵红晓
|
|
邮箱:17611022...@163.com
|

签名由 网易邮箱大师 定制

在2020年05月23日 00:19,Leonard Xu 写道:
Hi, hongxiao

我试了下,我本地hbase集群测试了下ok的[1],没能复现你的问题,你hbase集群的环境是怎么样的呀?

Best,
Leonard Xu
[1] 
https://github.com/leonardBang/flink-sql-etl/blob/master/etl-job/src/main/java/kafka2hbase/KafkaJoinHbaseJoinMysql2Hbase.java#L33
 


> 在 2020年5月22日,11:48,shao.hongxiao <17611022...@163.com> 写道:
>
> 一下是我的程序
> sql
> val hbaseTable =
>   """
> |CREATE TABLE pvuv_sink(
> |user_id varchar,
> |f ROW
> |) WITH (
> |'connector.type' = 'hbase',
> |'connector.version' = '1.4.3',
> |'connector.table-name' = 'test_shx',
> |'connector.zookeeper.quorum' = 'docker-hbase:2181',
> |'connector.zookeeper.znode.parent' = '/hbase'
> |)
>   """.stripMargin.toString
>
>  val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
> bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
>  bsEnv.setParallelism(1)
> val bsSettings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)
> bsTableEnv.sqlUpdate(hbaseTable)
> bsTableEnv.execute("SQL Job")
>
> 报错
> job graph 阶段
> HBaseRowInputFormat.java
> this.conf = {Configuration@4841} "Configuration: core-default.xml, 
> core-site.xml, hbase-default.xml, hbase-site.xml"
> quietmode = true
> allowNullValueProperties = false
> resources = {ArrayList@4859}  size = 2
> finalParameters = {Collections$SetFromMap@4860}  size = 0
> loadDefaults = true
> updatingResource = {ConcurrentHashMap@4861}  size = 343
> properties = {Properties@4862}  size = 343
> overlay = {Properties@4863}  size = 2
> classLoader = {Launcher$AppClassLoader@4864}
>
> Executor job 阶段  InstantiationUtil.javareadObjectFromConfig
> userCodeObject = {HBaseRowInputFormat@13658}
> tableName = "test_shx"
> schema = {HBaseTableSchema@13660}
> conf = null
> readHelper = null
> endReached = false
> table = null
> scan = null
> resultScanner = null
> currentRow = null
> scannedRows = 0
> runtimeContext = null
>
>
> 恳请各位大神相帮
>
>
> 邵红晓
> 邮箱:17611022...@163.com
>  
> 
> 签名由 网易邮箱大师  定制



Apache Flink - Error on creating savepoints using REST interface

2020-05-22 Thread M Singh
Hi:
I am using Flink 1.6.2 and trying to create a savepoint using the following 
curl command using the following references 
(https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/rest_api.html)
 and 
https://ci.apache.org/projects/flink/flink-docs-release-1.6/api/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.html
curl -v -H "Content-Type: application/json" -XPOST 
http:///jobs//savepoints -d 
'{"target-directory":"s3://mys3bucket/savepointdirectory/testMay22-sp1/"}' 

But I am getting the following error:
{"errors":["Request did not match expected format 
SavepointTriggerRequestBody."]}

Can you please let me know what I could be missing ?Thanks






Does Flink use EMRFS?

2020-05-22 Thread Peter Groesbeck
Hi,

I'm using Flink StreamingFileSink running in one AWS account (A) to another
(B). I'm also leveraging a SecurityConfiguration in the CFN to assume a
role in account B so that when I write there the files are owned by account
B which then in turn allows account B to delegate to other AWS accounts (C
and D). The reason these files must be owned by the other account is
because AWS doesn't support cross account delegation:
https://docs.aws.amazon.com/AmazonS3/latest/dev/example-walkthroughs-managing-access-example4.html

SecurityConfiguration:
  Type: AWS::EMR::SecurityConfiguration
  Properties:
Name: String
SecurityConfiguration:
  AuthorizationConfiguration:
EmrFsConfiguration:
  RoleMappings:
- Role: arn:aws:iam:::role/EMR_EC2_DefaultRole
  IdentifierType: Prefix
  Identifiers:
- s3://my-bucket/prefix/
- Role: arn:aws:iam:::role/EMR_DefaultRole
  IdentifierType: Prefix
  Identifiers:
- s3://my-bucket/prefix/


I've referenced this in my Cluster block as well:

ReleaseLabel: !Ref ReleaseLabel
SecurityConfiguration: !Ref SecurityConfiguration
ScaleDownBehavior: TERMINATE_AT_TASK_COMPLETION

For some reason the files are still owned by account A. It seems like Flink
is using the old Hadoop FS implementation instead of EMRFS which should (I
believe) grant the proper ownership so that bucket permissions can apply to
the written objects and in turn delegate read permissinos to accounts C, D
ect.

Any help would be greatly appreciated.

Thanks,
Peter


Apache Flink - Question about application restart

2020-05-22 Thread M Singh
Hi Flink Folks:
If I have a Flink Application with 10 restarts, if it fails and restarts, then:
1. Does the job have the same id ?2. Does the automatically restarting 
application, pickup from the last checkpoint ? I am assuming it does but just 
want to confirm.
Also, if it is running on AWS EMR I believe EMR/Yarn is configured to restart 
the job 3 times (after it has exhausted it's restart policy) .  If that is the 
case:1. Does the job get a new id ? I believe it does, but just want to 
confirm.2. Does the Yarn restart honor the last checkpoint ?  I believe, it 
does not, but is there a way to make it restart from the last checkpoint of the 
failed job (after it has exhausted its restart policy) ?
Thanks



Re: Stateful functions Harness

2020-05-22 Thread Boris Lublinsky
Also there seems to be a problem in this class public final class Modules

When I am running Harness example it fails with the error

Exception in thread "main" java.lang.IllegalStateException: There are no 
routers defined.
at 
org.apache.flink.statefun.flink.core.StatefulFunctionsUniverseValidator.validate(StatefulFunctionsUniverseValidator.java:31)
at 
org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:66)
at 
org.apache.flink.statefun.flink.harness.Harness.start(Harness.java:128)
at 
com.lightbend.flink.statefun.examples.greeter.HarnessRunner.main(HarnessRunner.java:18)

Which seems to be due to the fact that the method:

public static Modules loadFromClassPath() {
Returns no stageful functions - just IO modules






> On May 22, 2020, at 12:03 AM, Tzu-Li (Gordon) Tai  wrote:
> 
> Hi,
> 
> Sorry, I need to correct my comment on using the Kafka ingress / egress with 
> the Harness.
> 
> That is actually doable, by adding an extra dependency to 
> `statefun-flink-distribution` in your Harness program.
> That pulls in all the other required dependencies required by the Kafka 
> ingress / egress, such as the source / sink providers and Flink Kafka 
> connectors.
> 
> Cheers,
> Gordon
> 
> On Fri, May 22, 2020 at 12:04 PM Tzu-Li (Gordon) Tai  > wrote:
> Are you getting an exception from running the Harness?
> The Harness should already have the required configurations, such as the 
> parent first classloading config.
> 
> Otherwise, if you would like to add your own configuration, use the 
> `withConfiguration` method on the `Harness` class.
> 
> On Fri, May 22, 2020 at 7:19 AM Boris Lublinsky 
> mailto:boris.lublin...@lightbend.com>> wrote:
> Also, where do I put flint-conf.yaml in Idea to add additional required 
> config parameter:
> classloader.parent-first-patterns.additional: 
> org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
> 
> 
>> On May 21, 2020, at 12:22 PM, Boris Lublinsky > > wrote:
>> 
>> Hi, 
>> I am trying to run 
>> https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-greeter-example
>>  
>> 
>>  locally
>> using 
>> https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-flink-harness-example
>>  
>> 
>>  
>> And have several questions.
>> 1. It seems fairly straightforward to use it with in memory message 
>> generators, but I can’t figure out how to add Kafka ingress/Egress so that I 
>> can use it with Kafk
>> 2. GreetingModule already creates StatefulFunctionUniverse  and so does 
>> Harness. Is there a way to short circuit it and have Harness get 
>> StatefulFunctionUniverse directly
>> 3. Is there an example on how to write Flink main for stageful function?
>> 4. Is there an example anywhere on how to run such examples in the IDE with 
>> Kafka?
>> 5 There is a great stateful functions example 
>> https://github.com/ververica/flink-statefun-workshop 
>> , but its readme does 
>> not really describe implementation and neither does this article, 
>> referencing it 
>> https://dev.to/morsapaes/flink-stateful-functions-where-to-start-2j39 
>> . Is 
>> there anything that describes this implementation?
>> 
> 



Re: kerberos integration with flink

2020-05-22 Thread Nick Bendtner
Hi Guo,
Even for HDFS I don't really need to set "security.kerberos.login.contexts"
. As long as there is the right ticket in the ticket cache before starting
the flink cluster it seems to work fine. I think even [4] from your
reference seems to do the same thing. I have defined own ticket
cache specifically for flink cluster by setting this environment variable.
Before starting the cluster I create a ticket by using kinit.
This is how I make flink read this cache.
export KRB5CCNAME=/home/was/Jaas/krb5cc . I think even flink tries to find
the location of ticket cache using this variable [1].
Do you see any problems in setting up hadoop security module this way ? And
thanks a lot for your help.

[1]
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/security/KerberosUtils.java

Best,
Nick



On Thu, May 21, 2020 at 9:54 PM Yangze Guo  wrote:

> Hi, Nick,
>
> From my understanding, if you configure the
> "security.kerberos.login.keytab", Flink will add the
> AppConfigurationEntry of this keytab to all the apps defined in
> "security.kerberos.login.contexts". If you define
> "java.security.auth.login.config" at the same time, Flink will also
> keep the configuration in it. For more details, see [1][2].
>
> If you want to use this keytab to interact with HDFS, HBase and Yarn,
> you need to set "security.kerberos.login.contexts". See [3][4].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/ops/security-kerberos.html#jaas-security-module
> [2]
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModule.java
> [3]
> https://ci.apache.org/projects/flink/flink-docs-master/ops/security-kerberos.html#hadoop-security-module
> [4]
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
>
> Best,
> Yangze Guo
>
> On Thu, May 21, 2020 at 11:06 PM Nick Bendtner  wrote:
> >
> > Hi guys,
> > Is there any difference in providing kerberos config to the flink jvm
> using this method in the flink configuration?
> >
> > env.java.opts:  -Dconfig.resource=qa.conf
> -Djava.library.path=/usr/mware/flink-1.7.2/simpleapi/lib/
> -Djava.security.auth.login.config=/usr/mware/flink-1.7.2/Jaas/kafka-jaas.conf
> -Djava.security.krb5.conf=/usr/mware/flink-1.7.2/Jaas/krb5.conf
> >
> > Is there any difference in doing it this way vs providing it from
> security.kerberos.login.keytab .
> >
> > Best,
> >
> > Nick.
>


Re: Flink Dashboard UI Tasks hard limit

2020-05-22 Thread Vijay Balakrishnan
Hi Xintong,
Thx for your reply.  Increasing network memory buffers (fraction, min, max)
seems to increase tasks slightly.

Streaming job
Standalone

Vijay

On Fri, May 22, 2020 at 2:49 AM Xintong Song  wrote:

> Hi Vijay,
>
> I don't think your problem is related to number of opening files. The
> parallelism of your job is decided before actually tries to open the files.
> And if the OS limit for opening files is reached, you should see a job
> execution failure, instead of a success execution with a lower parallelism.
>
> Could you share some more information about your use case?
>
>- What kind of job are your executing? Is it a streaming or batch
>processing job?
>- Which Flink deployment do you use? Standalone? Yarn?
>- It would be helpful if you can share the Flink logs.
>
>
> Thank you~
>
> Xintong Song
>
>
>
> On Wed, May 20, 2020 at 11:50 PM Vijay Balakrishnan 
> wrote:
>
>> Hi,
>> I have increased the number of slots available but the Job is not using
>> all the slots but runs into this approximate 18000 Tasks limit. Looking
>> into the source code, it seems to be opening file -
>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java#L203
>> So, do I have to tune the ulimit or something similar at the Ubuntu O/S
>> level to increase number of tasks available ? What I am confused about is
>> the ulimit is per machine but the ExecutionGraph is across many machines ?
>> Please pardon my ignorance here. Does number of tasks equate to number of
>> open files. I am using 15 slots per TaskManager on AWS m5.4xlarge which has
>> 16 vCPUs.
>>
>> TIA.
>>
>> On Tue, May 19, 2020 at 3:22 PM Vijay Balakrishnan 
>> wrote:
>>
>>> Hi,
>>>
>>> Flink Dashboard UI seems to show tasks having a hard limit for Tasks
>>> column around 18000 on a Ubuntu Linux box.
>>> I kept increasing the number of slots per task manager to 15 and number
>>> of slots increased to 705 but the slots to tasks
>>> stayed at around 18000. Below 18000 tasks, the Flink Job is able to
>>> start up.
>>> Even though I increased the number of slots, it still works when 312
>>> slots are being used.
>>>
>>> taskmanager.numberOfTaskSlots: 15
>>>
>>> What knob can I tune to increase the number of Tasks ?
>>>
>>> Pls find attached the Flink Dashboard UI.
>>>
>>> TIA,
>>>
>>>


Re: 本地测试 flink 1.10 hbase sql create table 在反序列化byte之后得到的对象conf配置为null,导致无法连接hbase集群

2020-05-22 Thread Leonard Xu
Hi, hongxiao

我试了下,我本地hbase集群测试了下ok的[1],没能复现你的问题,你hbase集群的环境是怎么样的呀?

Best,
Leonard Xu
[1] 
https://github.com/leonardBang/flink-sql-etl/blob/master/etl-job/src/main/java/kafka2hbase/KafkaJoinHbaseJoinMysql2Hbase.java#L33
 


> 在 2020年5月22日,11:48,shao.hongxiao <17611022...@163.com> 写道:
> 
> 一下是我的程序
> sql
> val hbaseTable =
>   """
> |CREATE TABLE pvuv_sink(
> |user_id varchar,
> |f ROW
> |) WITH (
> |'connector.type' = 'hbase',
> |'connector.version' = '1.4.3',
> |'connector.table-name' = 'test_shx',
> |'connector.zookeeper.quorum' = 'docker-hbase:2181',
> |'connector.zookeeper.znode.parent' = '/hbase'
> |)
>   """.stripMargin.toString
> 
>  val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
> bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
>  bsEnv.setParallelism(1)
> val bsSettings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)
> bsTableEnv.sqlUpdate(hbaseTable)
> bsTableEnv.execute("SQL Job")
> 
> 报错
> job graph 阶段
> HBaseRowInputFormat.java 
> this.conf = {Configuration@4841} "Configuration: core-default.xml, 
> core-site.xml, hbase-default.xml, hbase-site.xml"
> quietmode = true
> allowNullValueProperties = false
> resources = {ArrayList@4859}  size = 2
> finalParameters = {Collections$SetFromMap@4860}  size = 0
> loadDefaults = true
> updatingResource = {ConcurrentHashMap@4861}  size = 343
> properties = {Properties@4862}  size = 343
> overlay = {Properties@4863}  size = 2
> classLoader = {Launcher$AppClassLoader@4864} 
> 
> Executor job 阶段  InstantiationUtil.javareadObjectFromConfig
> userCodeObject = {HBaseRowInputFormat@13658} 
> tableName = "test_shx"
> schema = {HBaseTableSchema@13660} 
> conf = null
> readHelper = null
> endReached = false
> table = null
> scan = null
> resultScanner = null
> currentRow = null
> scannedRows = 0
> runtimeContext = null
> 
> 
> 恳请各位大神相帮
> 
>   
> 邵红晓
> 邮箱:17611022...@163.com
>  
> 
> 签名由 网易邮箱大师  定制



Re: Why Flink Connector JDBC does't support LocalDateTime ?

2020-05-22 Thread Leonard Xu
Hi, forideal
Thanks for your report, 

As Jingsong said, we do not support LocalDateTime in JDBC connector now, I 
create a JIRA[1] to track this.


Best,
Leonard Xu
[1] https://issues.apache.org/jira/browse/FLINK-17890 


> 在 2020年5月22日,22:07,Jingsong Li  写道:
> 
> Hi forideal,
> 
> Just because we don't have time to support it. We just support LocalDateTime 
> in Flink after 1.9.
> 
> Welcome to contribute.
> 
> Best,
> Jingsong Lee
> 
> On Fri, May 22, 2020 at 2:48 PM forideal  > wrote:
> Hello, my friends
> 
>   env: Flink 1.10, Blink Planner   
>  table source
>  CREATE TABLE josn_table ( order_id VARCHAR, event_time TIMESTAMP(3), 
> proc_time AS PROCTIME() ) WITH (
> 'connector.properties.0.key' = 'bootstrap.servers',
>  'connector.properties.0.value' = 'localhost:9092',
>  'connector.property-version' = '1',
>  'connector.startup-mode' = 'earliest-offset',
>  'connector.topic' = 'raw',
>  'connector.type' = 'kafka',
>  'connector.version' = '0.11',
>  'format.derive-schema' = 'true',
>  'format.property-version' = '1',
>  'format.type' = 'json',
>  'update-mode' = 'append'
> )
> mysql dim table 
> CREATE TABLE ilms_box_d_order ( id VARCHAR, event_time TIMESTAMP(3)) WITH (
>  'connector.type' = 'jdbc',
>  'connector.url' = 'jdbc:mysql://localhost/mydb',
>  'connector.table' = 'test',
>  'connector.driver' = 'com.mysql.jdbc.Driver',
>  'connector.username' = 'test',
>  'connector.password' = 'test',
>  'connector.property-version' = '1'
> );
> DML
> INSERT INTO console_test SELECT
> t1. event_time,
> order_id
> FROM
>  josn_table
>  LEFT JOIN ilms_box_d_order FOR SYSTEM_TIME AS OF josn_table.proc_time AS t1 
> ON josn_table.order_id = t1.id  and  josn_table.event_time = 
> t1.event_time;
> When i exec this sql, i will get the follewing exception.
> Caused by: java.lang.ClassCastException: java.time.LocalDateTime cannot 
> be cast to java.sql.Timestamp, field index: 1, field value: 2020-05-22T14:00.
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcInputFormat.java#L236
>  
> 
> 
> Why don't we support LocalDateTime? 
> 
> Best wishes.
> forideal
> 
> 
> 
> 
> 
>  
> 
> 
> -- 
> Best, Jingsong Lee



Query Rest API from IDE during runtime

2020-05-22 Thread Annemarie Burger
Hi,

I want to query Flink's REST API in my IDE during runtime in order to get
the jobID of the job that is currently running. Is there any way to do this?
I found the RestClient class, but can't seem to figure out how to exactly
make this work. Any help much appreciated. 

Best,
Annemarie



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: sinktable更新部分字段问题

2020-05-22 Thread Leonard Xu
Hi,naturalfree

Flink SQL 里es sink 是支持Append mode和upsert mode的[1],upsert mode下支持按主键更新的,你可以看看。


Best,
Leonard Xu
[1] 
https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/connect.html#elasticsearch-connector
 


> 在 2020年5月20日,16:07,naturalfree  写道:
> 
> 现在有一个es索引,想通过flink sql根据主键更新部分字段。不知是否有可行方案
> 
> 
> | |
> naturalfree
> |
> |
> 邮箱:naturalf...@126.com
> |
> 
> 签名由 网易邮箱大师 定制



onTimer method in CoProcessFunction in flink

2020-05-22 Thread Jaswin Shah
How can I identify the type of element for which onTime is called in flink?
I want to store the objects for which onTimer is called to sideOutputs and then 
streamout the sideoutput data to kafka topic. I am not understanding how to 
stream out the sideoutput data like where should I write that processing logic. 
Below is the code snippet I have done so far


/**
 * CoProcessFuntion to process cart and pg messages connected using connect 
operator.
 * @author jaswin.shah
 * @version $Id: CartPGCoprocessFunction.java, v 0.1 2020-05-15 11:52 PM 
jaswin.shah Exp $$
 */
public class CartPGCoprocessFunction extends 
KeyedCoProcessFunction {

private static final Logger logger = 
LoggerFactory.getLogger(CartPGCoprocessFunction.class);

/**
 * Map state for cart messages, orderId+mid is key and cartMessage is value.
 */
private static MapState cartState = null;

/**
 * Map state for pg messages, orderId+mid is key and pgMessage is value.
 */
private static MapState pgState = null;


/**
 * Intializations for cart and pg mapStates
 *
 * @param config
 */
@Override
public void open(Configuration config) {
MapStateDescriptor cartStateDescriptor = new 
MapStateDescriptor<> (
Constants.CART_DATA,
TypeInformation.of(String.class),
TypeInformation.of(CartMessage.class)
);
cartState = getRuntimeContext().getMapState(cartStateDescriptor);

MapStateDescriptor 
pgStateDescriptor = new MapStateDescriptor<>(
Constants.PG_DATA,
TypeInformation.of(String.class),
TypeInformation.of(PaymentNotifyRequestWrapper.class)
);
pgState = getRuntimeContext().getMapState(pgStateDescriptor);
}

/**
 *
 * @return
 */

@Override
public void onTimer(long timestamp, OnTimerContext ctx, 
Collector out) throws Exception {

}

/**
 * 1. Get orderId+mid from cartMessage and check in PGMapState if an entry 
is present.
 * 2. If present, match, checkDescripancy, process and delete entry from 
pgMapState.
 * 3. If not present, add orderId+mid as key and cart object as value in 
cartMapState.
 * @param cartMessage
 * @param context
 * @param collector
 * @throws Exception
 */
@Override
public void processElement1(CartMessage cartMessage, Context context, 
Collector collector) throws Exception {

context.timerService().registerEventTimeTimer(context.timestamp()+360);

String searchKey = cartMessage.createJoinStringCondition();
PaymentNotifyRequestWrapper paymentNotifyObject = 
pgState.get(searchKey);
if(Objects.nonNull(paymentNotifyObject)) {
generateResultMessage(cartMessage,paymentNotifyObject,collector);
pgState.remove(searchKey);
} else {
cartState.put(searchKey,cartMessage);
}
}

/**
 * 1. Get orderId+mid from pgMessage and check in cartMapState if an entry 
is present.
 * 2. If present, match, checkDescripancy, process and delete entry from 
cartMapState.
 * 3. If not present, add orderId+mid as key and cart object as value in 
pgMapState.
 * @param pgMessage
 * @param context
 * @param collector
 * @throws Exception
 */
@Override
public void processElement2(PaymentNotifyRequestWrapper pgMessage, Context 
context, Collector collector) throws Exception {

context.timerService().registerEventTimeTimer(context.timestamp()+360);
String searchKey = pgMessage.createJoinStringCondition();
CartMessage cartMessage = cartState.get(searchKey);
if(Objects.nonNull(cartMessage)) {
generateResultMessage(cartMessage,pgMessage,collector);
cartState.remove(searchKey);
} else {
pgState.put(searchKey,pgMessage);
}
}

/**
 * Create ResultMessage from cart and pg messages.
 *
 * @param cartMessage
 * @param pgMessage
 * @return
 */
private void generateResultMessage(CartMessage cartMessage, 
PaymentNotifyRequestWrapper pgMessage,Collector collector) {
ResultMessage resultMessage = new ResultMessage();
Payment payment = null;

//Logic should be in cart: check
for (Payment pay : cartMessage.getPayments()) {
if (StringUtils.equals(Constants.FORWARD_PAYMENT, 
pay.mapToPaymentTypeInPG()) && StringUtils.equals(Constants.PAYTM_NEW_PROVIDER, 
pay.getProvider())) {
payment = pay;
break;
}
}
if(Objects.isNull(payment)) {
return;
}

resultMessage.setOrderId(cartMessage.getId());
resultMessage.setMid(payment.getMid());


resultMessage.setCartOrderStatus(cartMessage.mapToOrderStatus().getCode());
resultMessage.setPgOrderStatus(pgMessage.getOrderStatus());


Re: flink正则读取hdfs目录下的文件

2020-05-22 Thread Jingsong Li
Hi,

我们在1.11已经支持了较完整的filesystem支持,并且支持分区。(csv,json,avro,parquet,orc)

对于灵活的read,争取在1.12完成。已经有issue了:
https://issues.apache.org/jira/browse/FLINK-17398

Best,
Jingsong Lee

On Fri, May 22, 2020 at 10:47 AM 阿华田  wrote:

> input_data = "hdfs://localhost:9002/tmp/match_bak/%s*[0-9]" %
> ('2018-07-16’)
> result = sc.textFile(input_data)
> flink可以像spark一样正则读取hdfs目录下的文件吗?目前测试好像不行,如果不支持,最早什么版本会支持呢?
>
>
> | |
> 王志华
> |
> |
> a15733178...@163.com
> |
> 签名由网易邮箱大师定制
>
>

-- 
Best, Jingsong Lee


Re: Why Flink Connector JDBC does't support LocalDateTime ?

2020-05-22 Thread Jingsong Li
Hi forideal,

Just because we don't have time to support it. We just support
LocalDateTime in Flink after 1.9.

Welcome to contribute.

Best,
Jingsong Lee

On Fri, May 22, 2020 at 2:48 PM forideal  wrote:

> Hello, my friends
>
>   env: Flink 1.10, Blink Planner
>  table source
>
>  CREATE TABLE josn_table ( order_id VARCHAR, event_time TIMESTAMP(3), 
> proc_time AS PROCTIME() ) WITH (
> 'connector.properties.0.key' = 'bootstrap.servers',
>  'connector.properties.0.value' = 'localhost:9092',
>  'connector.property-version' = '1',
>  'connector.startup-mode' = 'earliest-offset',
>  'connector.topic' = 'raw',
>  'connector.type' = 'kafka',
>  'connector.version' = '0.11',
>  'format.derive-schema' = 'true',
>  'format.property-version' = '1',
>  'format.type' = 'json',
>  'update-mode' = 'append'
> )
>
> mysql dim table
>
> CREATE TABLE ilms_box_d_order ( id VARCHAR, event_time TIMESTAMP(3)) WITH (
>  'connector.type' = 'jdbc',
>  'connector.url' = 'jdbc:mysql://localhost/mydb',
>  'connector.table' = 'test',
>  'connector.driver' = 'com.mysql.jdbc.Driver',
>  'connector.username' = 'test',
>  'connector.password' = 'test',
>  'connector.property-version' = '1'
> );
>
> DML
>
> INSERT INTO console_test SELECT
> t1. event_time,
> order_id
> FROM
>  josn_table
>  LEFT JOIN ilms_box_d_order FOR SYSTEM_TIME AS OF josn_table.proc_time AS t1 
> ON josn_table.order_id = t1.id and  josn_table.event_time = t1.event_time;
>
> When i exec this sql, i will get the follewing exception.
> Caused by: java.lang.ClassCastException: java.time.LocalDateTime
> cannot be cast to java.sql.Timestamp, field index: 1, field value:
> 2020-05-22T14:00.
>
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcInputFormat.java#L236
>
> Why don't we support LocalDateTime?
>
> Best wishes.
> forideal
>
>
>
>
>
>
>


-- 
Best, Jingsong Lee


Re: How do I get the IP of the master and slave files programmatically in Flink?

2020-05-22 Thread Felipe Gutierrez
thanks. it worked!

I add the following method at the
org.apache.flink.streaming.api.operators.StreamingRuntimeContext
class:

public Environment getTaskEnvironment() { return this.taskEnvironment; }

Then I am getting the IP using:

ConfigOption restAddressOption = ConfigOptions
   .key("rest.address")
   .stringType()
   .noDefaultValue();
String restAddress =
this.getRuntimeContext().getTaskEnvironment().getTaskManagerInfo().getConfiguration().getValue(restAddressOption);

Thanks!

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

On Fri, May 22, 2020 at 3:54 AM Yangze Guo  wrote:
>
> Hi, Felipe
>
> I see your problem. IIUC, if you use AbstractUdfStreamOperator, you
> could indeed get all the configurations(including what you defined in
> flink-conf.yaml) through
> "AbstractUdfStreamOperator#getRuntimeContext().getTaskManagerRuntimeInfo().getConfiguration()".
> However, I guess it is not the right behavior and might be fixed in
> future versions.
>
> Best,
> Yangze Guo
>
>
>
> On Thu, May 21, 2020 at 3:13 PM Felipe Gutierrez
>  wrote:
> >
> > Hi all,
> >
> > I would like to have the IP of the JobManager, not the Task Executors.
> > I explain why.
> >
> > I have an operator (my own operator that extends
> > AbstractUdfStreamOperator) that sends and receives messages from a
> > global controller. So, regardless of which TaskManager these operator
> > instances are deployed, they need to send and receive messages from my
> > controller. Currently, I am doing this using MQTT broker (this is my
> > first approach and I don't know if there is a better way to do it,
> > maybe there is...)
> >
> > The first thing that I do is to start my controller using the
> > org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl and subscribe
> > it to the JobManager host. I am getting the IP of the JobManager by
> > adding this method on the
> > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory
> > class:
> >public String getRpcServiceAddress() {
> > return this.rpcService.getAddress();
> > }
> > That is working. Although I am not sure if it is the best approach.
> >
> > The second thing that I am doing is to make each operator instance
> > publish and subscribe to this controller. To do this they need the
> > JobManager IP. I could get the TaskManager IPs from the
> > AbstractUdfStreamOperator, but not the JobManager IP. So, I am passing
> > the JobManager IP as a parameter to the operator at the moment. I
> > suppose that it is easy to get the JobManager IP inside the
> > AbstractUdfStreamOperator or simply add some method somewhere to get
> > this value. However, I don't know where.
> >
> > Thanks,
> > Felipe
> >
> > --
> > -- Felipe Gutierrez
> > -- skype: felipe.o.gutierrez
> > -- https://felipeogutierrez.blogspot.com
> >
> > On Thu, May 21, 2020 at 7:13 AM Yangze Guo  wrote:
> > >
> > > Hi, Felipe
> > >
> > > Do you mean to get the Host and Port of the task executor where your
> > > operator is indeed running on?
> > >
> > > If that is the case, IIUC, two possible components that contain this
> > > information are RuntimeContext and the Configuration param of
> > > RichFunction#open. After reading the relevant code path, it seems you
> > > could not get it at the moment.
> > >
> > > Best,
> > > Yangze Guo
> > >
> > > Best,
> > > Yangze Guo
> > >
> > >
> > > On Wed, May 20, 2020 at 11:46 PM Alexander Fedulov
> > >  wrote:
> > > >
> > > > Hi Felippe,
> > > >
> > > > could you clarify in some more details what you are trying to achieve?
> > > >
> > > > Best regards,
> > > >
> > > > --
> > > >
> > > > Alexander Fedulov | Solutions Architect
> > > >
> > > > +49 1514 6265796
> > > >
> > > >
> > > >
> > > > Follow us @VervericaData
> > > >
> > > > --
> > > >
> > > > Join Flink Forward - The Apache Flink Conference
> > > >
> > > > Stream Processing | Event Driven | Real Time
> > > >
> > > > --
> > > >
> > > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> > > >
> > > > --
> > > >
> > > > Ververica GmbH
> > > > Registered at Amtsgericht Charlottenburg: HRB 158244 B
> > > > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji 
> > > > (Tony) Cheng
> > > >
> > > >
> > > >
> > > >
> > > > On Wed, May 20, 2020 at 1:14 PM Felipe Gutierrez 
> > > >  wrote:
> > > >>
> > > >> Hi all,
> > > >>
> > > >> I have my own operator that extends the AbstractUdfStreamOperator
> > > >> class and I want to issue some messages to it. Sometimes the operator
> > > >> instances are deployed on different TaskManagers and I would like to
> > > >> set some attributes like the master and slave IPs on it.
> > > >>
> > > >> I am trying to use these values but they only return localhost, not
> > > >> the IP configured at flink-conf.yaml file. (jobmanager.rpc.address:
> > > >> 192.168.56.1).
> > > >>
> > > >> ConfigOption restAddressOption = ConfigOptions
> > > >>.key("rest.address")
> > > >>.stringType()
> > > >>.noDefaultValue();

Re: Stateful functions Harness

2020-05-22 Thread Boris Lublinsky
Thanks,
I will try your suggestions.
One more question.
Is StatefulFunctionJob the one that is used for Stateful function execution in 
the docker case?



> On May 22, 2020, at 12:03 AM, Tzu-Li (Gordon) Tai  wrote:
> 
> Hi,
> 
> Sorry, I need to correct my comment on using the Kafka ingress / egress with 
> the Harness.
> 
> That is actually doable, by adding an extra dependency to 
> `statefun-flink-distribution` in your Harness program.
> That pulls in all the other required dependencies required by the Kafka 
> ingress / egress, such as the source / sink providers and Flink Kafka 
> connectors.
> 
> Cheers,
> Gordon
> 
> On Fri, May 22, 2020 at 12:04 PM Tzu-Li (Gordon) Tai  > wrote:
> Are you getting an exception from running the Harness?
> The Harness should already have the required configurations, such as the 
> parent first classloading config.
> 
> Otherwise, if you would like to add your own configuration, use the 
> `withConfiguration` method on the `Harness` class.
> 
> On Fri, May 22, 2020 at 7:19 AM Boris Lublinsky 
> mailto:boris.lublin...@lightbend.com>> wrote:
> Also, where do I put flint-conf.yaml in Idea to add additional required 
> config parameter:
> classloader.parent-first-patterns.additional: 
> org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
> 
> 
>> On May 21, 2020, at 12:22 PM, Boris Lublinsky > > wrote:
>> 
>> Hi, 
>> I am trying to run 
>> https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-greeter-example
>>  
>> 
>>  locally
>> using 
>> https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-flink-harness-example
>>  
>> 
>>  
>> And have several questions.
>> 1. It seems fairly straightforward to use it with in memory message 
>> generators, but I can’t figure out how to add Kafka ingress/Egress so that I 
>> can use it with Kafk
>> 2. GreetingModule already creates StatefulFunctionUniverse  and so does 
>> Harness. Is there a way to short circuit it and have Harness get 
>> StatefulFunctionUniverse directly
>> 3. Is there an example on how to write Flink main for stageful function?
>> 4. Is there an example anywhere on how to run such examples in the IDE with 
>> Kafka?
>> 5 There is a great stateful functions example 
>> https://github.com/ververica/flink-statefun-workshop 
>> , but its readme does 
>> not really describe implementation and neither does this article, 
>> referencing it 
>> https://dev.to/morsapaes/flink-stateful-functions-where-to-start-2j39 
>> . Is 
>> there anything that describes this implementation?
>> 
> 



java.lang.AbstractMethodError when implementing KafkaSerializationSchema

2020-05-22 Thread wangl...@geekplus.com.cn

public class MyKafkaSerializationSchema implements 
KafkaSerializationSchema> {
@Override
public ProducerRecord serialize(Tuple2 o, 
@Nullable Long aLong) {
ProducerRecord record = new ProducerRecord<>(o.f0,
o.f1.getBytes(StandardCharsets.UTF_8));
return record;
}
}
FlinkKafkaProducer> producer = new 
FlinkKafkaProducer>(
"default", new MyKafkaSerializationSchema(),
prop2,Semantic.EXACTLY_ONCE);

But there's  error when runnng:

java.lang.AbstractMethodError: 
com.geekplus.flinketl.schema.MyKafkaSerializationSchema.serialize(Ljava/lang/Object;Ljava/lang/Long;)Lorg/apache/flink/kafka/shaded/org/apache/kafka/clients/producer/ProducerRecord;

Any suggestion on this?

Thanks,
Lei


wangl...@geekplus.com.cn 



Re: Writing to SQL server

2020-05-22 Thread Martin Frank Hansen
Well got it working.

The varchars in the database were set too small.

Thanks for your help!

Den fre. 22. maj 2020 kl. 13.30 skrev Martin Frank Hansen <
m...@berlingskemedia.dk>:

> Arh ok thanks, no problem.
>
> My problem is now that nothing is sent, do I need to format it in another
> way? Or did I miss something else?
>
> I tried to include Class.forName(
> "com.microsoft.sqlserver.jdbc.SQLServerDriver")  but that didn't work.
>
> Den fre. 22. maj 2020 kl. 11.57 skrev Flavio Pompermaier <
> pomperma...@okkam.it>:
>
>> No sorry, you're right. The JDBCOutputFormat should work..I get confused
>> with the Table API
>>
>> On Fri, May 22, 2020 at 11:51 AM Martin Frank Hansen <
>> m...@berlingskemedia.dk> wrote:
>>
>>> Hi again,
>>>
>>> I am a bit confused as to why the generic jdbc connector would not work
>>> with sql-server?
>>>
>>> Can you explain a bit more?
>>>
>>>
>>> Den fre. 22. maj 2020 kl. 11.33 skrev Martin Frank Hansen <
>>> m...@berlingskemedia.dk>:
>>>
 Hi Flavio,

 Thanks for your reply. I will try another way then.


 Den fre. 22. maj 2020 kl. 11.31 skrev Flavio Pompermaier <
 pomperma...@okkam.it>:

> I expect you to see some exception somewhere, that sql server dialect
> is not supported yet.
>
> On Fri, May 22, 2020 at 11:29 AM Martin Frank Hansen <
> m...@berlingskemedia.dk> wrote:
>
>> Hi Flavio,
>>
>> Thank you so much! Thought i had that import but misread it.
>>
>> The code does not give any errors, but no data is written to the sql
>> server. Can you see why that is?
>>
>>
>>
>> Den fre. 22. maj 2020 kl. 09.02 skrev Flavio Pompermaier <
>> pomperma...@okkam.it>:
>>
>>> SQL server should not be supported from what I know..for this I
>>> opened a PR[1] that I should rebase.
>>> If someone is interested in I could do it
>>>
>>> [1] https://github.com/apache/flink/pull/12038 (FLINK-14101 )
>>>
>>> On Fri, May 22, 2020 at 8:35 AM Timo Walther 
>>> wrote:
>>>
 Hi Martin,

 usually, this error occurs when people forget to add
 `org.apache.flink.api.scala._` to their imports. It is triggered by
 the
 Scala macro that the DataStream API uses for extracting types.

 Can you try to call `result.toAppendStream[Row]` directly? This
 should
 work if you import `org.apache.flink.table.api.scala._`.

 Maybe this example helps:


 https://github.com/apache/flink/blob/release-1.10/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala

 Regards,
 Timo


 On 22.05.20 08:02, Martin Frank Hansen wrote:
 > Hi,
 >
 > I am trying to write input from Kafka to a SQL server on AWS, but
 I have
 > difficulties.
 >
 > I get the following error could not find implicit value for
 evidence
 > parameter of type
 >
 org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.types.Row]
 > [error]   val dsRow = tableEnv.toAppendStream[Row](result)
 > [error]   ^
 >
 > Any help is appreciated
 >
 > I am not sure whether my approach is correct or not but my code
 is
 > as follows:
 >
 > import java.util.Properties
 >
 > import org.apache.flink.table.descriptors.{Json, Kafka, Schema}
 > import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat,
 JDBCOutputFormat}
 > import
 org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer,
 FlinkKafkaConsumerBase, FlinkKafkaProducer, KafkaSerializationSchema}
 > import org.apache.flink.streaming.api.scala._
 > import org.apache.flink.api.scala._
 > import
 org.apache.flink.api.common.serialization.{SimpleStringEncoder,
 SimpleStringSchema}
 > import org.apache.flink.table.api.scala.StreamTableEnvironment
 > import org.apache.flink.table.api.{EnvironmentSettings, Table,
 TableEnvironment, Types}
 > import org.apache.flink.types.Row
 >
 >val properties =new Properties()
 >properties.setProperty("bootstrap.servers",b_servers)
 >properties.setProperty("zookeeper.connect",zk)
 >properties.setProperty("group.id ",
 "very_small_test")
 >properties.setProperty("ssl.endpoint.identification.algorithm
 ", "")
 >properties.setProperty("security.protocol", "SSL")
 >
 >
 >val kafkaSource: FlinkKafkaConsumerBase[String] =new
 FlinkKafkaConsumer[String]("very_small_test", new SimpleStringSchema(),
 properties).setStartFromTimestamp(0)

flink+prometheus继承问题

2020-05-22 Thread guaishushu1...@163.com
自定义的flink metrics可以通过flink rest接口拿到数据,但是prometheus没有拿到,有人遇到这种问题吗



guaishushu1...@163.com


Re: Writing to SQL server

2020-05-22 Thread Flavio Pompermaier
Is the sql-server jdbc jar in the flink dist lib folder?

On Fri, May 22, 2020 at 1:30 PM Martin Frank Hansen 
wrote:

> Arh ok thanks, no problem.
>
> My problem is now that nothing is sent, do I need to format it in another
> way? Or did I miss something else?
>
> I tried to include Class.forName(
> "com.microsoft.sqlserver.jdbc.SQLServerDriver")  but that didn't work.
>
> Den fre. 22. maj 2020 kl. 11.57 skrev Flavio Pompermaier <
> pomperma...@okkam.it>:
>
>> No sorry, you're right. The JDBCOutputFormat should work..I get confused
>> with the Table API
>>
>> On Fri, May 22, 2020 at 11:51 AM Martin Frank Hansen <
>> m...@berlingskemedia.dk> wrote:
>>
>>> Hi again,
>>>
>>> I am a bit confused as to why the generic jdbc connector would not work
>>> with sql-server?
>>>
>>> Can you explain a bit more?
>>>
>>>
>>> Den fre. 22. maj 2020 kl. 11.33 skrev Martin Frank Hansen <
>>> m...@berlingskemedia.dk>:
>>>
 Hi Flavio,

 Thanks for your reply. I will try another way then.


 Den fre. 22. maj 2020 kl. 11.31 skrev Flavio Pompermaier <
 pomperma...@okkam.it>:

> I expect you to see some exception somewhere, that sql server dialect
> is not supported yet.
>
> On Fri, May 22, 2020 at 11:29 AM Martin Frank Hansen <
> m...@berlingskemedia.dk> wrote:
>
>> Hi Flavio,
>>
>> Thank you so much! Thought i had that import but misread it.
>>
>> The code does not give any errors, but no data is written to the sql
>> server. Can you see why that is?
>>
>>
>>
>> Den fre. 22. maj 2020 kl. 09.02 skrev Flavio Pompermaier <
>> pomperma...@okkam.it>:
>>
>>> SQL server should not be supported from what I know..for this I
>>> opened a PR[1] that I should rebase.
>>> If someone is interested in I could do it
>>>
>>> [1] https://github.com/apache/flink/pull/12038 (FLINK-14101 )
>>>
>>> On Fri, May 22, 2020 at 8:35 AM Timo Walther 
>>> wrote:
>>>
 Hi Martin,

 usually, this error occurs when people forget to add
 `org.apache.flink.api.scala._` to their imports. It is triggered by
 the
 Scala macro that the DataStream API uses for extracting types.

 Can you try to call `result.toAppendStream[Row]` directly? This
 should
 work if you import `org.apache.flink.table.api.scala._`.

 Maybe this example helps:


 https://github.com/apache/flink/blob/release-1.10/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala

 Regards,
 Timo


 On 22.05.20 08:02, Martin Frank Hansen wrote:
 > Hi,
 >
 > I am trying to write input from Kafka to a SQL server on AWS, but
 I have
 > difficulties.
 >
 > I get the following error could not find implicit value for
 evidence
 > parameter of type
 >
 org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.types.Row]
 > [error]   val dsRow = tableEnv.toAppendStream[Row](result)
 > [error]   ^
 >
 > Any help is appreciated
 >
 > I am not sure whether my approach is correct or not but my code
 is
 > as follows:
 >
 > import java.util.Properties
 >
 > import org.apache.flink.table.descriptors.{Json, Kafka, Schema}
 > import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat,
 JDBCOutputFormat}
 > import
 org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer,
 FlinkKafkaConsumerBase, FlinkKafkaProducer, KafkaSerializationSchema}
 > import org.apache.flink.streaming.api.scala._
 > import org.apache.flink.api.scala._
 > import
 org.apache.flink.api.common.serialization.{SimpleStringEncoder,
 SimpleStringSchema}
 > import org.apache.flink.table.api.scala.StreamTableEnvironment
 > import org.apache.flink.table.api.{EnvironmentSettings, Table,
 TableEnvironment, Types}
 > import org.apache.flink.types.Row
 >
 >val properties =new Properties()
 >properties.setProperty("bootstrap.servers",b_servers)
 >properties.setProperty("zookeeper.connect",zk)
 >properties.setProperty("group.id ",
 "very_small_test")
 >properties.setProperty("ssl.endpoint.identification.algorithm
 ", "")
 >properties.setProperty("security.protocol", "SSL")
 >
 >
 >val kafkaSource: FlinkKafkaConsumerBase[String] =new
 FlinkKafkaConsumer[String]("very_small_test", new SimpleStringSchema(),
 properties).setStartFromTimestamp(0)
 >
 >val settings =
 

Re: Writing to SQL server

2020-05-22 Thread Martin Frank Hansen
Arh ok thanks, no problem.

My problem is now that nothing is sent, do I need to format it in another
way? Or did I miss something else?

I tried to include Class.forName(
"com.microsoft.sqlserver.jdbc.SQLServerDriver")  but that didn't work.

Den fre. 22. maj 2020 kl. 11.57 skrev Flavio Pompermaier <
pomperma...@okkam.it>:

> No sorry, you're right. The JDBCOutputFormat should work..I get confused
> with the Table API
>
> On Fri, May 22, 2020 at 11:51 AM Martin Frank Hansen <
> m...@berlingskemedia.dk> wrote:
>
>> Hi again,
>>
>> I am a bit confused as to why the generic jdbc connector would not work
>> with sql-server?
>>
>> Can you explain a bit more?
>>
>>
>> Den fre. 22. maj 2020 kl. 11.33 skrev Martin Frank Hansen <
>> m...@berlingskemedia.dk>:
>>
>>> Hi Flavio,
>>>
>>> Thanks for your reply. I will try another way then.
>>>
>>>
>>> Den fre. 22. maj 2020 kl. 11.31 skrev Flavio Pompermaier <
>>> pomperma...@okkam.it>:
>>>
 I expect you to see some exception somewhere, that sql server dialect
 is not supported yet.

 On Fri, May 22, 2020 at 11:29 AM Martin Frank Hansen <
 m...@berlingskemedia.dk> wrote:

> Hi Flavio,
>
> Thank you so much! Thought i had that import but misread it.
>
> The code does not give any errors, but no data is written to the sql
> server. Can you see why that is?
>
>
>
> Den fre. 22. maj 2020 kl. 09.02 skrev Flavio Pompermaier <
> pomperma...@okkam.it>:
>
>> SQL server should not be supported from what I know..for this I
>> opened a PR[1] that I should rebase.
>> If someone is interested in I could do it
>>
>> [1] https://github.com/apache/flink/pull/12038 (FLINK-14101 )
>>
>> On Fri, May 22, 2020 at 8:35 AM Timo Walther 
>> wrote:
>>
>>> Hi Martin,
>>>
>>> usually, this error occurs when people forget to add
>>> `org.apache.flink.api.scala._` to their imports. It is triggered by
>>> the
>>> Scala macro that the DataStream API uses for extracting types.
>>>
>>> Can you try to call `result.toAppendStream[Row]` directly? This
>>> should
>>> work if you import `org.apache.flink.table.api.scala._`.
>>>
>>> Maybe this example helps:
>>>
>>>
>>> https://github.com/apache/flink/blob/release-1.10/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala
>>>
>>> Regards,
>>> Timo
>>>
>>>
>>> On 22.05.20 08:02, Martin Frank Hansen wrote:
>>> > Hi,
>>> >
>>> > I am trying to write input from Kafka to a SQL server on AWS, but
>>> I have
>>> > difficulties.
>>> >
>>> > I get the following error could not find implicit value for
>>> evidence
>>> > parameter of type
>>> >
>>> org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.types.Row]
>>> > [error]   val dsRow = tableEnv.toAppendStream[Row](result)
>>> > [error]   ^
>>> >
>>> > Any help is appreciated
>>> >
>>> > I am not sure whether my approach is correct or not but my code is
>>> > as follows:
>>> >
>>> > import java.util.Properties
>>> >
>>> > import org.apache.flink.table.descriptors.{Json, Kafka, Schema}
>>> > import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat,
>>> JDBCOutputFormat}
>>> > import
>>> org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer,
>>> FlinkKafkaConsumerBase, FlinkKafkaProducer, KafkaSerializationSchema}
>>> > import org.apache.flink.streaming.api.scala._
>>> > import org.apache.flink.api.scala._
>>> > import
>>> org.apache.flink.api.common.serialization.{SimpleStringEncoder,
>>> SimpleStringSchema}
>>> > import org.apache.flink.table.api.scala.StreamTableEnvironment
>>> > import org.apache.flink.table.api.{EnvironmentSettings, Table,
>>> TableEnvironment, Types}
>>> > import org.apache.flink.types.Row
>>> >
>>> >val properties =new Properties()
>>> >properties.setProperty("bootstrap.servers",b_servers)
>>> >properties.setProperty("zookeeper.connect",zk)
>>> >properties.setProperty("group.id ",
>>> "very_small_test")
>>> >properties.setProperty("ssl.endpoint.identification.algorithm
>>> ", "")
>>> >properties.setProperty("security.protocol", "SSL")
>>> >
>>> >
>>> >val kafkaSource: FlinkKafkaConsumerBase[String] =new
>>> FlinkKafkaConsumer[String]("very_small_test", new SimpleStringSchema(),
>>> properties).setStartFromTimestamp(0)
>>> >
>>> >val settings =
>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>>> >val env = StreamExecutionEnvironment.getExecutionEnvironment
>>> > val tableEnv = StreamTableEnvironment.create(env, settings)
>>> >
>>> > val schema =new Schema()
>>> >  

Re: DDL定义@timestamp字段问题提示org.apache.flink.table.api.SqlParserException

2020-05-22 Thread oliver
谢谢 Benchao
是的,1.10.0,我升级到1.10.1试试

> 2020年5月22日 下午6:48,Benchao Li  写道:
> 
> Hi,
> 
> 这个应该是一个已知bug[1], 你应该用的是1.10.0版本吧?这个在1.10.1已经修复了。
> 
> [1] https://issues.apache.org/jira/browse/FLINK-16068
> 
> oliver  于2020年5月22日周五 下午5:38写道:
> 
>> hello,
>> 使用版本 Flink 1.10
>> 部分业务数据kafka数据存在字段`@timestamp`,
>> DDL如下:
>> CREATE TABLE kafkaSrc(
>>`@timestamp` TIMESTAMP(3)
>>,domain VARCHAR
>>,proctime AS proctime()
>> )WITH(
>>'connector.type' = 'kafka',
>>'connector.version' = 'universal',
>>'connector.topic' = ’topic',
>>'connector.startup-mode' = 'latest-offset',
>>'connector.properties.group.id' = ‘id',
>>'connector.properties.zookeeper.connect' = ‘xxx',
>>'connector.properties.bootstrap.servers' = ‘xxx',
>>'format.type' = 'json',
>>'format.derive-schema' = 'true'
>> );
>> 
>> 如果DML为:
>> insert into MyResult
>> select
>>,`@timestamp`
>>,domain
>>,proctime
>> from
>>kafkaSrc
>> ;
>> 
>> 则会提示SQL解析失败,异常如下:
>> Caused by: org.apache.flink.table.api.SqlParserException: SQL parse
>> failed. Lexical error at line 1, column 8.  Encountered: "@" (64), after :
>> ""
>>at
>> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50)
>>at
>> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79)
>>at
>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111)
>>at
>> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
>>at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
>>at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
>>at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
>>at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
>>at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
>>at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
>>at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
>>at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
>>at
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
>>at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)
>>at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436)
>>at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154)
>>at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342)
>>at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)
>>at
>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
>>at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
>> 
>> Debug发现:
>> 
>> https://github.com/apache/flink/blob/release-1.10/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/CalciteParser.java#L45-L52
>> 
>> 函数:public SqlNode parse(String sql)入参sql的value是:SELECT
>> @timestamp,domain,PROCTIME() FROM __temp_table__
>> 
>> 图片地址:http://chuantu.xyz/t6/735/1590139759x2728303292.png
>> 
>> 另外,去掉proctime AS proctime(),可以正常输出结果,不知道有没有帮助
>> 
>> Best,
>> Oliver 云长
>> 
>> 
> 
> -- 
> 
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
> 



Re: Flink Window with multiple trigger condition

2020-05-22 Thread aj
I was also thinking to have a processing time window but that will not work
for me. I want to start the window when the user  "*search*" event arrives.
So for each user window will start from the *search* event.
 The Tumbling window has fixed start end time so that will not be suitable
in my case.




On Fri, May 22, 2020 at 10:23 AM Tzu-Li (Gordon) Tai 
wrote:

> Hi,
>
> To achieve what you have in mind, I think what you have to do is to use a
> processing time window of 30 mins, and have a custom trigger that matches
> the "start" event in the `onElement` method and return
> TriggerResult.FIRE_AND_PURGE.
>
> That way, the window fires either when the processing time has passed, or
> the start event was recieved.
>
> Cheers,
> Gordon
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07






Re: DDL定义@timestamp字段问题提示org.apache.flink.table.api.SqlParserException

2020-05-22 Thread Benchao Li
Hi,

这个应该是一个已知bug[1], 你应该用的是1.10.0版本吧?这个在1.10.1已经修复了。

[1] https://issues.apache.org/jira/browse/FLINK-16068

oliver  于2020年5月22日周五 下午5:38写道:

> hello,
> 使用版本 Flink 1.10
> 部分业务数据kafka数据存在字段`@timestamp`,
> DDL如下:
> CREATE TABLE kafkaSrc(
> `@timestamp` TIMESTAMP(3)
> ,domain VARCHAR
> ,proctime AS proctime()
>  )WITH(
> 'connector.type' = 'kafka',
> 'connector.version' = 'universal',
> 'connector.topic' = ’topic',
> 'connector.startup-mode' = 'latest-offset',
> 'connector.properties.group.id' = ‘id',
> 'connector.properties.zookeeper.connect' = ‘xxx',
> 'connector.properties.bootstrap.servers' = ‘xxx',
> 'format.type' = 'json',
> 'format.derive-schema' = 'true'
>  );
>
> 如果DML为:
> insert into MyResult
> select
> ,`@timestamp`
> ,domain
> ,proctime
> from
> kafkaSrc
> ;
>
> 则会提示SQL解析失败,异常如下:
> Caused by: org.apache.flink.table.api.SqlParserException: SQL parse
> failed. Lexical error at line 1, column 8.  Encountered: "@" (64), after :
> ""
> at
> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50)
> at
> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79)
> at
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
>
> Debug发现:
>
> https://github.com/apache/flink/blob/release-1.10/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/CalciteParser.java#L45-L52
>
> 函数:public SqlNode parse(String sql)入参sql的value是:SELECT
> @timestamp,domain,PROCTIME() FROM __temp_table__
>
> 图片地址:http://chuantu.xyz/t6/735/1590139759x2728303292.png
>
> 另外,去掉proctime AS proctime(),可以正常输出结果,不知道有没有帮助
>
> Best,
> Oliver 云长
>
>

-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: Flink 1.8.3 Kubernetes POD OOM

2020-05-22 Thread Andrey Zagrebin
Hi Josson,

Do you use state backend? is it RocksDB?

Best,
Andrey

On Fri, May 22, 2020 at 12:58 PM Fabian Hueske  wrote:

> Hi Josson,
>
> I don't have much experience setting memory bounds in Kubernetes myself,
> but my colleague Andrey (in CC) reworked Flink's memory configuration for
> the last release to ease the configuration in container envs.
> He might be able to help.
>
> Best, Fabian
>
> Am Do., 21. Mai 2020 um 18:43 Uhr schrieb Josson Paul <
> jossonp...@gmail.com>:
>
>> Cluster type: Standalone cluster
>> Job Type: Streaming
>> JVM memory: 26.2 GB
>> POD memory: 33 GB
>> CPU: 10 Cores
>> GC: G1GC
>> Flink Version: 1.8.3
>> State back end: File based
>> NETWORK_BUFFERS_MEMORY_FRACTION : 0.02f of the Heap
>> We are not accessing Direct memory from application. Only Flink uses
>> direct memory
>>
>> We notice that in Flink 1.8.3 over a period of 30 minutes the POD is
>> killed with OOM. JVM Heap is with in limit.
>> We read from Kafka and have windows in the application. Our Sink is
>> either Kafka or Elastic Search
>> *The same application/job was working perfectly in Flink 1.4.1 with the
>> same input rate and output rate*
>> No back pressure
>> *I have attached few Grafana charts as PDF*
>> Any idea why the off heap memory / outside JVM memory is going up and
>> eventually reaching the limit.
>>
>>  Java Heap (reserved=26845184KB, committed=26845184KB)
>> (mmap: reserved=26845184KB, committed=26845184KB)
>>
>> - Class (reserved=1241866KB, committed=219686KB)
>> (classes #36599)
>> (malloc=4874KB #74568)
>> (mmap: reserved=1236992KB, committed=214812KB)
>>
>> - Thread (reserved=394394KB, committed=394394KB)
>> (thread #383)
>> (stack: reserved=392696KB, committed=392696KB)
>> (malloc=1250KB #1920)
>> (arena=448KB #764)
>>
>> - Code (reserved=272178KB, committed=137954KB)
>> (malloc=22578KB #33442)
>> (mmap: reserved=249600KB, committed=115376KB)
>>
>> - GC (reserved=1365088KB, committed=1365088KB)
>> (malloc=336112KB #1130298)
>> (mmap: reserved=1028976KB, committed=1028976KB)
>>
>>
>>
>> --
>> Thanks
>> Josson
>>
>


How to control the banlence by using FlinkKafkaConsumer.setStartFromTimestamp(timestamp)

2020-05-22 Thread Jary Zhen
Hello everyone,

   First,a brief pipeline introduction:
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  consume multi kafka topic
  -> union them
  -> assignTimestampsAndWatermarks
  -> keyby
  -> window()  and so on …
It's a very normal way use flink to process data like this in production
environment.
But,  If I want to test the pipeline above I need to use the api of
FlinkKafkaConsumer.setStartFromTimestamp(timestamp) to comsume 'past’ data.
So my question is how to control the ’step‘ banlence as one topic produces
3 records per second while another topic produces 3 per second.

I don’t know if I describe clearly . so any suspicion please let me know

Tks


How to control the banlence by using FlinkKafkaConsumer.setStartFromTimestamp(timestamp)

2020-05-22 Thread Jary Zhen
Hello everyone,

   First,a brief pipeline introduction:
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  consume multi kafka topic
  -> union them
  -> assignTimestampsAndWatermarks
  -> keyby
  -> window()  and so on …
It's a very normal way use flink to process data like this in production
environment.
But,  If I want to test the pipeline above I need to use the api of
FlinkKafkaConsumer.setStartFromTimestamp(timestamp) to comsume 'past’ data.
So my question is how to control the ’step‘ banlence as one topic produces
3 records per second while another topic produces 3 per second.

I don’t know if I describe clearly . so any suspicion please let me know

Tks


Re: Flink Dashboard UI Tasks hard limit

2020-05-22 Thread Xintong Song
Hi Vijay,

I don't think your problem is related to number of opening files. The
parallelism of your job is decided before actually tries to open the files.
And if the OS limit for opening files is reached, you should see a job
execution failure, instead of a success execution with a lower parallelism.

Could you share some more information about your use case?

   - What kind of job are your executing? Is it a streaming or batch
   processing job?
   - Which Flink deployment do you use? Standalone? Yarn?
   - It would be helpful if you can share the Flink logs.


Thank you~

Xintong Song



On Wed, May 20, 2020 at 11:50 PM Vijay Balakrishnan 
wrote:

> Hi,
> I have increased the number of slots available but the Job is not using
> all the slots but runs into this approximate 18000 Tasks limit. Looking
> into the source code, it seems to be opening file -
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java#L203
> So, do I have to tune the ulimit or something similar at the Ubuntu O/S
> level to increase number of tasks available ? What I am confused about is
> the ulimit is per machine but the ExecutionGraph is across many machines ?
> Please pardon my ignorance here. Does number of tasks equate to number of
> open files. I am using 15 slots per TaskManager on AWS m5.4xlarge which has
> 16 vCPUs.
>
> TIA.
>
> On Tue, May 19, 2020 at 3:22 PM Vijay Balakrishnan 
> wrote:
>
>> Hi,
>>
>> Flink Dashboard UI seems to show tasks having a hard limit for Tasks
>> column around 18000 on a Ubuntu Linux box.
>> I kept increasing the number of slots per task manager to 15 and number
>> of slots increased to 705 but the slots to tasks
>> stayed at around 18000. Below 18000 tasks, the Flink Job is able to start
>> up.
>> Even though I increased the number of slots, it still works when 312
>> slots are being used.
>>
>> taskmanager.numberOfTaskSlots: 15
>>
>> What knob can I tune to increase the number of Tasks ?
>>
>> Pls find attached the Flink Dashboard UI.
>>
>> TIA,
>>
>>


Re: Using Queryable State within 1 job + docs suggestion

2020-05-22 Thread m@xi
Hi Gordon,

Yes we are well aware of the inconsistencies that can (and will) emerge
while using queryable state like this. However, we will treat them manually
for ensuring the correctness of our targeting applications.

Therefore, could you help with Annemarie's question or are you aware of
someone that has done this again here in the nabble list to include him to
the discussion?

Thanks in advance.

Best,
Max



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink 1.8.3 Kubernetes POD OOM

2020-05-22 Thread Fabian Hueske
Hi Josson,

I don't have much experience setting memory bounds in Kubernetes myself,
but my colleague Andrey (in CC) reworked Flink's memory configuration for
the last release to ease the configuration in container envs.
He might be able to help.

Best, Fabian

Am Do., 21. Mai 2020 um 18:43 Uhr schrieb Josson Paul :

> Cluster type: Standalone cluster
> Job Type: Streaming
> JVM memory: 26.2 GB
> POD memory: 33 GB
> CPU: 10 Cores
> GC: G1GC
> Flink Version: 1.8.3
> State back end: File based
> NETWORK_BUFFERS_MEMORY_FRACTION : 0.02f of the Heap
> We are not accessing Direct memory from application. Only Flink uses
> direct memory
>
> We notice that in Flink 1.8.3 over a period of 30 minutes the POD is
> killed with OOM. JVM Heap is with in limit.
> We read from Kafka and have windows in the application. Our Sink is either
> Kafka or Elastic Search
> *The same application/job was working perfectly in Flink 1.4.1 with the
> same input rate and output rate*
> No back pressure
> *I have attached few Grafana charts as PDF*
> Any idea why the off heap memory / outside JVM memory is going up and
> eventually reaching the limit.
>
>  Java Heap (reserved=26845184KB, committed=26845184KB)
> (mmap: reserved=26845184KB, committed=26845184KB)
>
> - Class (reserved=1241866KB, committed=219686KB)
> (classes #36599)
> (malloc=4874KB #74568)
> (mmap: reserved=1236992KB, committed=214812KB)
>
> - Thread (reserved=394394KB, committed=394394KB)
> (thread #383)
> (stack: reserved=392696KB, committed=392696KB)
> (malloc=1250KB #1920)
> (arena=448KB #764)
>
> - Code (reserved=272178KB, committed=137954KB)
> (malloc=22578KB #33442)
> (mmap: reserved=249600KB, committed=115376KB)
>
> - GC (reserved=1365088KB, committed=1365088KB)
> (malloc=336112KB #1130298)
> (mmap: reserved=1028976KB, committed=1028976KB)
>
>
>
> --
> Thanks
> Josson
>


Re: Writing to SQL server

2020-05-22 Thread Flavio Pompermaier
No sorry, you're right. The JDBCOutputFormat should work..I get confused
with the Table API

On Fri, May 22, 2020 at 11:51 AM Martin Frank Hansen <
m...@berlingskemedia.dk> wrote:

> Hi again,
>
> I am a bit confused as to why the generic jdbc connector would not work
> with sql-server?
>
> Can you explain a bit more?
>
>
> Den fre. 22. maj 2020 kl. 11.33 skrev Martin Frank Hansen <
> m...@berlingskemedia.dk>:
>
>> Hi Flavio,
>>
>> Thanks for your reply. I will try another way then.
>>
>>
>> Den fre. 22. maj 2020 kl. 11.31 skrev Flavio Pompermaier <
>> pomperma...@okkam.it>:
>>
>>> I expect you to see some exception somewhere, that sql server dialect is
>>> not supported yet.
>>>
>>> On Fri, May 22, 2020 at 11:29 AM Martin Frank Hansen <
>>> m...@berlingskemedia.dk> wrote:
>>>
 Hi Flavio,

 Thank you so much! Thought i had that import but misread it.

 The code does not give any errors, but no data is written to the sql
 server. Can you see why that is?



 Den fre. 22. maj 2020 kl. 09.02 skrev Flavio Pompermaier <
 pomperma...@okkam.it>:

> SQL server should not be supported from what I know..for this I
> opened a PR[1] that I should rebase.
> If someone is interested in I could do it
>
> [1] https://github.com/apache/flink/pull/12038 (FLINK-14101 )
>
> On Fri, May 22, 2020 at 8:35 AM Timo Walther 
> wrote:
>
>> Hi Martin,
>>
>> usually, this error occurs when people forget to add
>> `org.apache.flink.api.scala._` to their imports. It is triggered by
>> the
>> Scala macro that the DataStream API uses for extracting types.
>>
>> Can you try to call `result.toAppendStream[Row]` directly? This
>> should
>> work if you import `org.apache.flink.table.api.scala._`.
>>
>> Maybe this example helps:
>>
>>
>> https://github.com/apache/flink/blob/release-1.10/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala
>>
>> Regards,
>> Timo
>>
>>
>> On 22.05.20 08:02, Martin Frank Hansen wrote:
>> > Hi,
>> >
>> > I am trying to write input from Kafka to a SQL server on AWS, but I
>> have
>> > difficulties.
>> >
>> > I get the following error could not find implicit value for
>> evidence
>> > parameter of type
>> >
>> org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.types.Row]
>> > [error]   val dsRow = tableEnv.toAppendStream[Row](result)
>> > [error]   ^
>> >
>> > Any help is appreciated
>> >
>> > I am not sure whether my approach is correct or not but my code is
>> > as follows:
>> >
>> > import java.util.Properties
>> >
>> > import org.apache.flink.table.descriptors.{Json, Kafka, Schema}
>> > import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat,
>> JDBCOutputFormat}
>> > import
>> org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer,
>> FlinkKafkaConsumerBase, FlinkKafkaProducer, KafkaSerializationSchema}
>> > import org.apache.flink.streaming.api.scala._
>> > import org.apache.flink.api.scala._
>> > import
>> org.apache.flink.api.common.serialization.{SimpleStringEncoder,
>> SimpleStringSchema}
>> > import org.apache.flink.table.api.scala.StreamTableEnvironment
>> > import org.apache.flink.table.api.{EnvironmentSettings, Table,
>> TableEnvironment, Types}
>> > import org.apache.flink.types.Row
>> >
>> >val properties =new Properties()
>> >properties.setProperty("bootstrap.servers",b_servers)
>> >properties.setProperty("zookeeper.connect",zk)
>> >properties.setProperty("group.id ",
>> "very_small_test")
>> >properties.setProperty("ssl.endpoint.identification.algorithm ",
>> "")
>> >properties.setProperty("security.protocol", "SSL")
>> >
>> >
>> >val kafkaSource: FlinkKafkaConsumerBase[String] =new
>> FlinkKafkaConsumer[String]("very_small_test", new SimpleStringSchema(),
>> properties).setStartFromTimestamp(0)
>> >
>> >val settings =
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>> >val env = StreamExecutionEnvironment.getExecutionEnvironment
>> > val tableEnv = StreamTableEnvironment.create(env, settings)
>> >
>> > val schema =new Schema()
>> >  .field("fullVisitorId",Types.STRING)
>> >  .field("eventTime",Types.STRING)
>> >  .field("eventID",Types.STRING)
>> >  .field("eventType",Types.STRING)
>> >  .field("page",Types.MAP( Types.STRING, Types.STRING))
>> >  .field("CustomDimensions",Types.MAP( Types.STRING,
>> Types.STRING))
>> >
>> >
>> >tableEnv.connect(new Kafka()
>> >.version("universal")
>> >

Re: Writing to SQL server

2020-05-22 Thread Martin Frank Hansen
Hi again,

I am a bit confused as to why the generic jdbc connector would not work
with sql-server?

Can you explain a bit more?


Den fre. 22. maj 2020 kl. 11.33 skrev Martin Frank Hansen <
m...@berlingskemedia.dk>:

> Hi Flavio,
>
> Thanks for your reply. I will try another way then.
>
>
> Den fre. 22. maj 2020 kl. 11.31 skrev Flavio Pompermaier <
> pomperma...@okkam.it>:
>
>> I expect you to see some exception somewhere, that sql server dialect is
>> not supported yet.
>>
>> On Fri, May 22, 2020 at 11:29 AM Martin Frank Hansen <
>> m...@berlingskemedia.dk> wrote:
>>
>>> Hi Flavio,
>>>
>>> Thank you so much! Thought i had that import but misread it.
>>>
>>> The code does not give any errors, but no data is written to the sql
>>> server. Can you see why that is?
>>>
>>>
>>>
>>> Den fre. 22. maj 2020 kl. 09.02 skrev Flavio Pompermaier <
>>> pomperma...@okkam.it>:
>>>
 SQL server should not be supported from what I know..for this I
 opened a PR[1] that I should rebase.
 If someone is interested in I could do it

 [1] https://github.com/apache/flink/pull/12038 (FLINK-14101 )

 On Fri, May 22, 2020 at 8:35 AM Timo Walther 
 wrote:

> Hi Martin,
>
> usually, this error occurs when people forget to add
> `org.apache.flink.api.scala._` to their imports. It is triggered by
> the
> Scala macro that the DataStream API uses for extracting types.
>
> Can you try to call `result.toAppendStream[Row]` directly? This should
> work if you import `org.apache.flink.table.api.scala._`.
>
> Maybe this example helps:
>
>
> https://github.com/apache/flink/blob/release-1.10/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala
>
> Regards,
> Timo
>
>
> On 22.05.20 08:02, Martin Frank Hansen wrote:
> > Hi,
> >
> > I am trying to write input from Kafka to a SQL server on AWS, but I
> have
> > difficulties.
> >
> > I get the following error could not find implicit value for evidence
> > parameter of type
> >
> org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.types.Row]
> > [error]   val dsRow = tableEnv.toAppendStream[Row](result)
> > [error]   ^
> >
> > Any help is appreciated
> >
> > I am not sure whether my approach is correct or not but my code is
> > as follows:
> >
> > import java.util.Properties
> >
> > import org.apache.flink.table.descriptors.{Json, Kafka, Schema}
> > import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat,
> JDBCOutputFormat}
> > import
> org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer,
> FlinkKafkaConsumerBase, FlinkKafkaProducer, KafkaSerializationSchema}
> > import org.apache.flink.streaming.api.scala._
> > import org.apache.flink.api.scala._
> > import
> org.apache.flink.api.common.serialization.{SimpleStringEncoder,
> SimpleStringSchema}
> > import org.apache.flink.table.api.scala.StreamTableEnvironment
> > import org.apache.flink.table.api.{EnvironmentSettings, Table,
> TableEnvironment, Types}
> > import org.apache.flink.types.Row
> >
> >val properties =new Properties()
> >properties.setProperty("bootstrap.servers",b_servers)
> >properties.setProperty("zookeeper.connect",zk)
> >properties.setProperty("group.id ",
> "very_small_test")
> >properties.setProperty("ssl.endpoint.identification.algorithm ",
> "")
> >properties.setProperty("security.protocol", "SSL")
> >
> >
> >val kafkaSource: FlinkKafkaConsumerBase[String] =new
> FlinkKafkaConsumer[String]("very_small_test", new SimpleStringSchema(),
> properties).setStartFromTimestamp(0)
> >
> >val settings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> >val env = StreamExecutionEnvironment.getExecutionEnvironment
> > val tableEnv = StreamTableEnvironment.create(env, settings)
> >
> > val schema =new Schema()
> >  .field("fullVisitorId",Types.STRING)
> >  .field("eventTime",Types.STRING)
> >  .field("eventID",Types.STRING)
> >  .field("eventType",Types.STRING)
> >  .field("page",Types.MAP( Types.STRING, Types.STRING))
> >  .field("CustomDimensions",Types.MAP( Types.STRING,
> Types.STRING))
> >
> >
> >tableEnv.connect(new Kafka()
> >.version("universal")
> >.topic("very_small_test")
> >.properties(properties)
> >.startFromEarliest()
> >   )
> >  .withFormat(
> >  new Json()
> >.failOnMissingField(false)
> >.deriveSchema()
> >)
> >  .withSchema(schema)
> >  .inAppendMode()
> >  

Re: Adaptive Watermarks Generator

2020-05-22 Thread Fabian Hueske
Hi,

The code of the implementation is linked in the paper:
https://github.com/DataSystemsGroupUT/Adaptive-Watermarks
Since this is a prototype for a research paper, I'm doubtful that the
project is maintained.
I also didn't find an open-source license attached to the code.
Hence adding the project to flink-packages.org wouldn't help a lot.

One option would be to get permission to fork the project (by adding a
permissive open source license) and finding somebody who forks and
maintains the code.
Alternatively, it might be possible to create a completely new
implementation based on the paper (I guess).
Then it could be added to flink-packages.org.

Best, Fabian

Am Fr., 22. Mai 2020 um 09:01 Uhr schrieb 杨东晓 :

> Thanks Robert! Could you point out the repository name ? I didn't find it
> from  flink-packages.org  Packages list.
>
> Robert Metzger  于2020年5月19日周二 下午5:02写道:
>
>> Hi,
>> I haven't seen this paper before. I'm not aware of any efforts in the
>> community to integrate this watermark generator.
>> I'm also not sure if we would integrate it directory into the main Flink
>> repository. What I could see is this generator being maintained in a public
>> GitHub repository, linked from flink-packages.org
>>
>> Best,
>> Robert
>>
>> On Sat, May 9, 2020 at 8:47 AM 杨东晓  wrote:
>>
>>> Hi , I noticed there is a paper describe about adaptive watermark
>>> generator on top of Apache Flink v1.6.2 :
>>> https://www.dfki.de/fileadmin/user_upload/import/10260_awad-adaptive-watermarks.pdf
>>>
>>> This looks like a more precise generator with much less data drop . Does
>>> anybody know more detail about this or does Flink community have any plan
>>> about this?
>>>
>>> Thanks!
>>>
>>


DDL定义@timestamp字段问题提示org.apache.flink.table.api.SqlParserException

2020-05-22 Thread oliver
hello,
使用版本 Flink 1.10
部分业务数据kafka数据存在字段`@timestamp`,
DDL如下:
CREATE TABLE kafkaSrc(
`@timestamp` TIMESTAMP(3)
,domain VARCHAR
,proctime AS proctime()
 )WITH(
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = ’topic',
'connector.startup-mode' = 'latest-offset',
'connector.properties.group.id' = ‘id',
'connector.properties.zookeeper.connect' = ‘xxx',
'connector.properties.bootstrap.servers' = ‘xxx',
'format.type' = 'json',
'format.derive-schema' = 'true'
 );

如果DML为:
insert into MyResult
select
,`@timestamp`
,domain
,proctime
from
kafkaSrc
;

则会提示SQL解析失败,异常如下:
Caused by: org.apache.flink.table.api.SqlParserException: SQL parse failed. 
Lexical error at line 1, column 8.  Encountered: "@" (64), after : ""
at 
org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50)
at 
org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79)
at 
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)

Debug发现:
https://github.com/apache/flink/blob/release-1.10/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/CalciteParser.java#L45-L52

函数:public SqlNode parse(String sql)入参sql的value是:SELECT 
@timestamp,domain,PROCTIME() FROM __temp_table__

图片地址:http://chuantu.xyz/t6/735/1590139759x2728303292.png

另外,去掉proctime AS proctime(),可以正常输出结果,不知道有没有帮助

Best,
Oliver 云长



Re: Writing to SQL server

2020-05-22 Thread Martin Frank Hansen
Hi Flavio,

Thank you so much! Thought i had that import but misread it.

The code does not give any errors, but no data is written to the sql
server. Can you see why that is?



Den fre. 22. maj 2020 kl. 09.02 skrev Flavio Pompermaier <
pomperma...@okkam.it>:

> SQL server should not be supported from what I know..for this I opened a
> PR[1] that I should rebase.
> If someone is interested in I could do it
>
> [1] https://github.com/apache/flink/pull/12038 (FLINK-14101 )
>
> On Fri, May 22, 2020 at 8:35 AM Timo Walther  wrote:
>
>> Hi Martin,
>>
>> usually, this error occurs when people forget to add
>> `org.apache.flink.api.scala._` to their imports. It is triggered by the
>> Scala macro that the DataStream API uses for extracting types.
>>
>> Can you try to call `result.toAppendStream[Row]` directly? This should
>> work if you import `org.apache.flink.table.api.scala._`.
>>
>> Maybe this example helps:
>>
>>
>> https://github.com/apache/flink/blob/release-1.10/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala
>>
>> Regards,
>> Timo
>>
>>
>> On 22.05.20 08:02, Martin Frank Hansen wrote:
>> > Hi,
>> >
>> > I am trying to write input from Kafka to a SQL server on AWS, but I
>> have
>> > difficulties.
>> >
>> > I get the following error could not find implicit value for evidence
>> > parameter of type
>> >
>> org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.types.Row]
>> > [error]   val dsRow = tableEnv.toAppendStream[Row](result)
>> > [error]   ^
>> >
>> > Any help is appreciated
>> >
>> > I am not sure whether my approach is correct or not but my code is
>> > as follows:
>> >
>> > import java.util.Properties
>> >
>> > import org.apache.flink.table.descriptors.{Json, Kafka, Schema}
>> > import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat,
>> JDBCOutputFormat}
>> > import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer,
>> FlinkKafkaConsumerBase, FlinkKafkaProducer, KafkaSerializationSchema}
>> > import org.apache.flink.streaming.api.scala._
>> > import org.apache.flink.api.scala._
>> > import org.apache.flink.api.common.serialization.{SimpleStringEncoder,
>> SimpleStringSchema}
>> > import org.apache.flink.table.api.scala.StreamTableEnvironment
>> > import org.apache.flink.table.api.{EnvironmentSettings, Table,
>> TableEnvironment, Types}
>> > import org.apache.flink.types.Row
>> >
>> >val properties =new Properties()
>> >properties.setProperty("bootstrap.servers",b_servers)
>> >properties.setProperty("zookeeper.connect",zk)
>> >properties.setProperty("group.id ",
>> "very_small_test")
>> >properties.setProperty("ssl.endpoint.identification.algorithm ", "")
>> >properties.setProperty("security.protocol", "SSL")
>> >
>> >
>> >val kafkaSource: FlinkKafkaConsumerBase[String] =new
>> FlinkKafkaConsumer[String]("very_small_test", new SimpleStringSchema(),
>> properties).setStartFromTimestamp(0)
>> >
>> >val settings =
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>> >val env = StreamExecutionEnvironment.getExecutionEnvironment
>> > val tableEnv = StreamTableEnvironment.create(env, settings)
>> >
>> > val schema =new Schema()
>> >  .field("fullVisitorId",Types.STRING)
>> >  .field("eventTime",Types.STRING)
>> >  .field("eventID",Types.STRING)
>> >  .field("eventType",Types.STRING)
>> >  .field("page",Types.MAP( Types.STRING, Types.STRING))
>> >  .field("CustomDimensions",Types.MAP( Types.STRING, Types.STRING))
>> >
>> >
>> >tableEnv.connect(new Kafka()
>> >.version("universal")
>> >.topic("very_small_test")
>> >.properties(properties)
>> >.startFromEarliest()
>> >   )
>> >  .withFormat(
>> >  new Json()
>> >.failOnMissingField(false)
>> >.deriveSchema()
>> >)
>> >  .withSchema(schema)
>> >  .inAppendMode()
>> >  .registerTableSource("sql_source")
>> >
>> >
>> > val sqlStatement ="SELECT * from sql_source where
>> CustomDimensions['pagePath'] like '%BT%'"
>> >
>> > val result =tableEnv.sqlQuery(sqlStatement)
>> >
>> >val dsRow =tableEnv.toAppendStream[Row](result)
>> >
>> >
>> > val jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
>> >  .setDrivername("com.microsoft.sqlserver.jdbc.SQLServerDriver")
>> >  .setDBUrl("AWS url")
>> > .setUsername(username)
>> >  .setPassword(password)
>> >  .setQuery("INSERT INTO kafka_data_test (fullVisitorId, EventTime,
>> eventID) VALUES
>> > (?, ?, ?)")
>> >  .setBatchInterval(100)
>> >  .finish()
>> >
>> >dsRow.writeUsingOutputFormat(jdbcOutput)
>> >
>> > tableEnv.execute("SQL test")
>> >
>> >
>> > --
>> >
>> > *Best regards
>> >
>> > Martin Hansen*
>> >
>>
>

-- 

Best regards

Martin Hansen


Re: Writing to SQL server

2020-05-22 Thread Martin Frank Hansen
Hi Flavio,

Thanks for your reply. I will try another way then.


Den fre. 22. maj 2020 kl. 11.31 skrev Flavio Pompermaier <
pomperma...@okkam.it>:

> I expect you to see some exception somewhere, that sql server dialect is
> not supported yet.
>
> On Fri, May 22, 2020 at 11:29 AM Martin Frank Hansen <
> m...@berlingskemedia.dk> wrote:
>
>> Hi Flavio,
>>
>> Thank you so much! Thought i had that import but misread it.
>>
>> The code does not give any errors, but no data is written to the sql
>> server. Can you see why that is?
>>
>>
>>
>> Den fre. 22. maj 2020 kl. 09.02 skrev Flavio Pompermaier <
>> pomperma...@okkam.it>:
>>
>>> SQL server should not be supported from what I know..for this I opened a
>>> PR[1] that I should rebase.
>>> If someone is interested in I could do it
>>>
>>> [1] https://github.com/apache/flink/pull/12038 (FLINK-14101 )
>>>
>>> On Fri, May 22, 2020 at 8:35 AM Timo Walther  wrote:
>>>
 Hi Martin,

 usually, this error occurs when people forget to add
 `org.apache.flink.api.scala._` to their imports. It is triggered by the
 Scala macro that the DataStream API uses for extracting types.

 Can you try to call `result.toAppendStream[Row]` directly? This should
 work if you import `org.apache.flink.table.api.scala._`.

 Maybe this example helps:


 https://github.com/apache/flink/blob/release-1.10/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala

 Regards,
 Timo


 On 22.05.20 08:02, Martin Frank Hansen wrote:
 > Hi,
 >
 > I am trying to write input from Kafka to a SQL server on AWS, but I
 have
 > difficulties.
 >
 > I get the following error could not find implicit value for evidence
 > parameter of type
 >
 org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.types.Row]
 > [error]   val dsRow = tableEnv.toAppendStream[Row](result)
 > [error]   ^
 >
 > Any help is appreciated
 >
 > I am not sure whether my approach is correct or not but my code is
 > as follows:
 >
 > import java.util.Properties
 >
 > import org.apache.flink.table.descriptors.{Json, Kafka, Schema}
 > import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat,
 JDBCOutputFormat}
 > import
 org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer,
 FlinkKafkaConsumerBase, FlinkKafkaProducer, KafkaSerializationSchema}
 > import org.apache.flink.streaming.api.scala._
 > import org.apache.flink.api.scala._
 > import
 org.apache.flink.api.common.serialization.{SimpleStringEncoder,
 SimpleStringSchema}
 > import org.apache.flink.table.api.scala.StreamTableEnvironment
 > import org.apache.flink.table.api.{EnvironmentSettings, Table,
 TableEnvironment, Types}
 > import org.apache.flink.types.Row
 >
 >val properties =new Properties()
 >properties.setProperty("bootstrap.servers",b_servers)
 >properties.setProperty("zookeeper.connect",zk)
 >properties.setProperty("group.id ",
 "very_small_test")
 >properties.setProperty("ssl.endpoint.identification.algorithm ",
 "")
 >properties.setProperty("security.protocol", "SSL")
 >
 >
 >val kafkaSource: FlinkKafkaConsumerBase[String] =new
 FlinkKafkaConsumer[String]("very_small_test", new SimpleStringSchema(),
 properties).setStartFromTimestamp(0)
 >
 >val settings =
 EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
 >val env = StreamExecutionEnvironment.getExecutionEnvironment
 > val tableEnv = StreamTableEnvironment.create(env, settings)
 >
 > val schema =new Schema()
 >  .field("fullVisitorId",Types.STRING)
 >  .field("eventTime",Types.STRING)
 >  .field("eventID",Types.STRING)
 >  .field("eventType",Types.STRING)
 >  .field("page",Types.MAP( Types.STRING, Types.STRING))
 >  .field("CustomDimensions",Types.MAP( Types.STRING, Types.STRING))
 >
 >
 >tableEnv.connect(new Kafka()
 >.version("universal")
 >.topic("very_small_test")
 >.properties(properties)
 >.startFromEarliest()
 >   )
 >  .withFormat(
 >  new Json()
 >.failOnMissingField(false)
 >.deriveSchema()
 >)
 >  .withSchema(schema)
 >  .inAppendMode()
 >  .registerTableSource("sql_source")
 >
 >
 > val sqlStatement ="SELECT * from sql_source where
 CustomDimensions['pagePath'] like '%BT%'"
 >
 > val result =tableEnv.sqlQuery(sqlStatement)
 >
 >val dsRow =tableEnv.toAppendStream[Row](result)
 >
 >
 > val jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
 >  

Re: Writing to SQL server

2020-05-22 Thread Flavio Pompermaier
I expect you to see some exception somewhere, that sql server dialect is
not supported yet.

On Fri, May 22, 2020 at 11:29 AM Martin Frank Hansen <
m...@berlingskemedia.dk> wrote:

> Hi Flavio,
>
> Thank you so much! Thought i had that import but misread it.
>
> The code does not give any errors, but no data is written to the sql
> server. Can you see why that is?
>
>
>
> Den fre. 22. maj 2020 kl. 09.02 skrev Flavio Pompermaier <
> pomperma...@okkam.it>:
>
>> SQL server should not be supported from what I know..for this I opened a
>> PR[1] that I should rebase.
>> If someone is interested in I could do it
>>
>> [1] https://github.com/apache/flink/pull/12038 (FLINK-14101 )
>>
>> On Fri, May 22, 2020 at 8:35 AM Timo Walther  wrote:
>>
>>> Hi Martin,
>>>
>>> usually, this error occurs when people forget to add
>>> `org.apache.flink.api.scala._` to their imports. It is triggered by the
>>> Scala macro that the DataStream API uses for extracting types.
>>>
>>> Can you try to call `result.toAppendStream[Row]` directly? This should
>>> work if you import `org.apache.flink.table.api.scala._`.
>>>
>>> Maybe this example helps:
>>>
>>>
>>> https://github.com/apache/flink/blob/release-1.10/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala
>>>
>>> Regards,
>>> Timo
>>>
>>>
>>> On 22.05.20 08:02, Martin Frank Hansen wrote:
>>> > Hi,
>>> >
>>> > I am trying to write input from Kafka to a SQL server on AWS, but I
>>> have
>>> > difficulties.
>>> >
>>> > I get the following error could not find implicit value for evidence
>>> > parameter of type
>>> >
>>> org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.types.Row]
>>> > [error]   val dsRow = tableEnv.toAppendStream[Row](result)
>>> > [error]   ^
>>> >
>>> > Any help is appreciated
>>> >
>>> > I am not sure whether my approach is correct or not but my code is
>>> > as follows:
>>> >
>>> > import java.util.Properties
>>> >
>>> > import org.apache.flink.table.descriptors.{Json, Kafka, Schema}
>>> > import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat,
>>> JDBCOutputFormat}
>>> > import
>>> org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer,
>>> FlinkKafkaConsumerBase, FlinkKafkaProducer, KafkaSerializationSchema}
>>> > import org.apache.flink.streaming.api.scala._
>>> > import org.apache.flink.api.scala._
>>> > import org.apache.flink.api.common.serialization.{SimpleStringEncoder,
>>> SimpleStringSchema}
>>> > import org.apache.flink.table.api.scala.StreamTableEnvironment
>>> > import org.apache.flink.table.api.{EnvironmentSettings, Table,
>>> TableEnvironment, Types}
>>> > import org.apache.flink.types.Row
>>> >
>>> >val properties =new Properties()
>>> >properties.setProperty("bootstrap.servers",b_servers)
>>> >properties.setProperty("zookeeper.connect",zk)
>>> >properties.setProperty("group.id ",
>>> "very_small_test")
>>> >properties.setProperty("ssl.endpoint.identification.algorithm ", "")
>>> >properties.setProperty("security.protocol", "SSL")
>>> >
>>> >
>>> >val kafkaSource: FlinkKafkaConsumerBase[String] =new
>>> FlinkKafkaConsumer[String]("very_small_test", new SimpleStringSchema(),
>>> properties).setStartFromTimestamp(0)
>>> >
>>> >val settings =
>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>>> >val env = StreamExecutionEnvironment.getExecutionEnvironment
>>> > val tableEnv = StreamTableEnvironment.create(env, settings)
>>> >
>>> > val schema =new Schema()
>>> >  .field("fullVisitorId",Types.STRING)
>>> >  .field("eventTime",Types.STRING)
>>> >  .field("eventID",Types.STRING)
>>> >  .field("eventType",Types.STRING)
>>> >  .field("page",Types.MAP( Types.STRING, Types.STRING))
>>> >  .field("CustomDimensions",Types.MAP( Types.STRING, Types.STRING))
>>> >
>>> >
>>> >tableEnv.connect(new Kafka()
>>> >.version("universal")
>>> >.topic("very_small_test")
>>> >.properties(properties)
>>> >.startFromEarliest()
>>> >   )
>>> >  .withFormat(
>>> >  new Json()
>>> >.failOnMissingField(false)
>>> >.deriveSchema()
>>> >)
>>> >  .withSchema(schema)
>>> >  .inAppendMode()
>>> >  .registerTableSource("sql_source")
>>> >
>>> >
>>> > val sqlStatement ="SELECT * from sql_source where
>>> CustomDimensions['pagePath'] like '%BT%'"
>>> >
>>> > val result =tableEnv.sqlQuery(sqlStatement)
>>> >
>>> >val dsRow =tableEnv.toAppendStream[Row](result)
>>> >
>>> >
>>> > val jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
>>> >  .setDrivername("com.microsoft.sqlserver.jdbc.SQLServerDriver")
>>> >  .setDBUrl("AWS url")
>>> > .setUsername(username)
>>> >  .setPassword(password)
>>> >  .setQuery("INSERT INTO kafka_data_test (fullVisitorId, EventTime,
>>> eventID) VALUES
>>> > (?, ?, ?)")
>>> >  .setBatchInterval(100)
>>> >  .finish()

Re: 疑问:flink sql 不同job消费同一个kafka表(指定了groupId)时输出相同数据?

2020-05-22 Thread Benchao Li
Hi,

Flink的Kafka Connector的实现是用的Kafka lower
api,也就是会自己去获取当前的partition信息,自己来分配那些subtask读取那个partition。
所以如果有两个任务,他们互相之间是没有关系的,也不会相互感知到。(只有一点,就是如果你配置了相同的group
id,他们提交offset可能会互相覆盖。)
你说的那个模式是Kafka high-level api。

wind.fly@outlook.com  于2020年5月22日周五 下午4:21写道:

> Hi,all
> 使用flink版本1.10.0,在hive catalog下建了映射kafka的表:
> CREATE TABLE x.log.yanfa_log (
> dt TIMESTAMP(3),
> conn_id STRING,
> sequence STRING,
> trace_id STRING,
> span_info STRING,
> service_id STRING,
> msg_id STRING,
> servicename STRING,
> ret_code STRING,
> duration STRING,
> req_body MAP,
> res_body MAP,
> extra_info MAP,
> WATERMARK FOR dt AS dt - INTERVAL '60' SECOND
> ) WITH (
> 'connector.type' = 'kafka',
> 'connector.version' = '0.11',
> 'connector.topic' = 'x-log-yanfa_log',
> 'connector.properties.bootstrap.servers' = '**:9092',
> 'connector.properties.zookeeper.connect' = '**:2181',
> 'connector.properties.group.id' = 'testGroup',
> 'connector.startup-mode' = 'group-offsets',
> 'update-mode' = 'append',
> 'format.type' = 'json',
> 'format.fail-on-missing-field' = 'true'
> );
> 消费表x.log.yanfa_log程序如下:
> Catalog myCatalog = new HiveCatalog("x", "default",
> "D:\\conf", "1.1.0");
> tEnv.registerCatalog("x", myCatalog);
> Table rs = tEnv.sqlQuery("select * from x.log.yanfa_log");
> tEnv.toAppendStream(rs, Row.class).print();
>
> 然后针对同一个程序启动了2个job,结果都输出了相同的结果。我的疑问是kafka
> topic的同一个partition不是只能被group下至多一个consumer消费吗?为什么2个job会输出相同结果呢?
>


-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


疑问:flink sql 不同job消费同一个kafka表(指定了groupId)时输出相同数据?

2020-05-22 Thread wind.fly....@outlook.com
Hi,all
使用flink版本1.10.0,在hive catalog下建了映射kafka的表:
CREATE TABLE x.log.yanfa_log (
dt TIMESTAMP(3),
conn_id STRING,
sequence STRING,
trace_id STRING,
span_info STRING,
service_id STRING,
msg_id STRING,
servicename STRING,
ret_code STRING,
duration STRING,
req_body MAP,
res_body MAP,
extra_info MAP,
WATERMARK FOR dt AS dt - INTERVAL '60' SECOND
) WITH (
'connector.type' = 'kafka',
'connector.version' = '0.11',
'connector.topic' = 'x-log-yanfa_log',
'connector.properties.bootstrap.servers' = '**:9092',
'connector.properties.zookeeper.connect' = '**:2181',
'connector.properties.group.id' = 'testGroup',
'connector.startup-mode' = 'group-offsets',
'update-mode' = 'append',
'format.type' = 'json',
'format.fail-on-missing-field' = 'true'
);
消费表x.log.yanfa_log程序如下:
Catalog myCatalog = new HiveCatalog("x", "default",
"D:\\conf", "1.1.0");
tEnv.registerCatalog("x", myCatalog);
Table rs = tEnv.sqlQuery("select * from x.log.yanfa_log");
tEnv.toAppendStream(rs, Row.class).print();

然后针对同一个程序启动了2个job,结果都输出了相同的结果。我的疑问是kafka 
topic的同一个partition不是只能被group下至多一个consumer消费吗?为什么2个job会输出相同结果呢?


flink ???? Kafka ???? eos ????

2020-05-22 Thread ????????????????
Hi All??


flink  kafka ?? eos ?? 
??


0-05-21 16:52:15,057 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: Custom 
Source (1/1) (f65b2869d898a050238c53f9fbc9573b) switched from DEPLOYING to 
RUNNING. 2020-05-21 16:52:15,062 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- 
Co-Process-Broadcast - Map - Sink: Unnamed (1/1) 
(d0739aa81367223f83a63a86307fffb3) switched from DEPLOYING to RUNNING. 
2020-05-21 16:52:15,276 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- 
Co-Process-Broadcast - Map - Sink: Unnamed (1/1) 
(d0739aa81367223f83a63a86307fffb3) switched from RUNNING to FAILED. 
org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: 
Could not find a coordinator with type TRANSACTION with key 
Co-Process-Broadcast - Map - Sink: 
Unnamed-c4ffe334eee7821772b24597621064ce-32 due tounexpected error: The server 
experienced an unexpected error when processing the request.   at 
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
 at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 at java.lang.reflect.Constructor.newInstance(Constructor.java:423) 
 at 
java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:593)  
 at 
java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677) at 
java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735)  at 
java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160)  at 
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)   
at 
java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:583)  at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.abortTransactions(FlinkKafkaProducer.java:1101)
   at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1037)
 at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
 at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
   at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:284)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:989)
  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
   at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460) 
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)   at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:533) at 
java.lang.Thread.run(Thread.java:748) Caused by: 
org.apache.kafka.common.KafkaException: Could not find a coordinator with type 
TRANSACTION with key Co-Process-Broadcast - Map - Sink: 
Unnamed-c4ffe334eee7821772b24597621064ce-32 due tounexpected error: The server 
experienced an unexpected error when processing the request.  at 
org.apache.kafka.clients.producer.internals.TransactionManager$FindCoordinatorHandler.handleResponse(TransactionManager.java:1142)
   at 
org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:909)
 at 
org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)  at 
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:557)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549)  at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:288)  at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:235)  
... 1 more

关于flink-streaming cogroup 算子 window 支持sideout 问题

2020-05-22 Thread zhaiao...@58.com
flink 1.10.0 中  streaming API 的 coGroup 算子目前还不支持sideout 来处理迟到的消息,请问社区有打算支持吗,


zhaiao...@58ganji.com


回复:sql client定义指向elasticsearch索引密码问题

2020-05-22 Thread naturalfree
好的,非常感谢


| |
naturalfree
|
|
邮箱:naturalf...@126.com
|

签名由 网易邮箱大师 定制

在2020年05月22日 11:15,Yangze Guo 写道:
目前1.11已经feature freeze,该功能最早1.12才能支持,着急的话可以看看DataStream
API的ElasticSearchSink,这个是支持安全认证的,也可以自己实现一个TableSink

Best,
Yangze Guo

On Fri, May 22, 2020 at 9:59 AM Rui Li  wrote:
>
> Hi,目前还不支持,不过有PR在做这个功能:https://github.com/apache/flink/pull/11822
>
> On Wed, May 20, 2020 at 4:10 PM naturalfree  wrote:
>
> > 在flink sql client配置文件中定义指向es的索引。发现没有设置用户名密码的属性,现在的es connector是否支持安全认证呢
> >
> > | |
> > naturalfree
> > |
> > |
> > 邮箱:naturalf...@126.com
> > |
> >
> > 签名由 网易邮箱大师 定制
>
>
>
> --
> Best regards!
> Rui Li


Session Window使用event time延迟特别高

2020-05-22 Thread 李佳宸
大家好,

我遇到一个问题一直想不明白原因,想请教大家

我的代码keyby userid 然后使用session window 实现一个按userid聚合 并执行了一个 topN方法。

代码大致如下
// Topn聚合
DataStream itemList = resultDataStream
.assignTimestampsAndWatermarks(
new
BoundedOutOfOrdernessTimestampExtractor(Time.milliseconds(100))
{
@Override
 public long extractTimestamp(PredictResult
predictResult) {
 return predictResult.getDate_timestamp();
   }
}
)
.keyBy("userId")

.window(EventTimeSessionWindows.withGap(Time.milliseconds(100)))
.process(new TopNService(11));
itemList.print("IRS_RESULT: ");


作业的延迟特别的高,高达30秒,完全无法接受。 起初我以为是自己的 topN方法有问题,但我采用
ProcessTimeSessionWindow后,延迟降低为一秒以内。
使用processtime 的弊端是gap是不好估计,高了影响作业延迟,低了 无法完成预期的聚合,导致报错(且运行不稳定)。
我不太理解为什么会出现这样的情况~还烦请大家给与一点解决思路~~


谢谢

// top n方法

public static class TopNService extends
ProcessWindowFunction {

private final int topSize;

public TopNService(int topSize) {
this.topSize = topSize;
}
@Override
public void process(Tuple tuple, Context context,
Iterable iterable, Collector collector) throws
Exception {
List allItems = new ArrayList<>();
for (PredictResult predictResult:iterable){
allItems.add(predictResult);
}
allItems.sort(new Comparator() {
@Override
public int compare(PredictResult o1, PredictResult o2) {
return o2.probability.compareTo(o1.probability);
}
});
int userId = allItems.get(0).userId ;
String logonType=allItems.get(0).getLogonType();
StringBuilder result = new StringBuilder();
for (int i=0;i

Re: Writing to SQL server

2020-05-22 Thread Flavio Pompermaier
SQL server should not be supported from what I know..for this I opened a
PR[1] that I should rebase.
If someone is interested in I could do it

[1] https://github.com/apache/flink/pull/12038 (FLINK-14101 )

On Fri, May 22, 2020 at 8:35 AM Timo Walther  wrote:

> Hi Martin,
>
> usually, this error occurs when people forget to add
> `org.apache.flink.api.scala._` to their imports. It is triggered by the
> Scala macro that the DataStream API uses for extracting types.
>
> Can you try to call `result.toAppendStream[Row]` directly? This should
> work if you import `org.apache.flink.table.api.scala._`.
>
> Maybe this example helps:
>
>
> https://github.com/apache/flink/blob/release-1.10/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala
>
> Regards,
> Timo
>
>
> On 22.05.20 08:02, Martin Frank Hansen wrote:
> > Hi,
> >
> > I am trying to write input from Kafka to a SQL server on AWS, but I have
> > difficulties.
> >
> > I get the following error could not find implicit value for evidence
> > parameter of type
> >
> org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.types.Row]
> > [error]   val dsRow = tableEnv.toAppendStream[Row](result)
> > [error]   ^
> >
> > Any help is appreciated
> >
> > I am not sure whether my approach is correct or not but my code is
> > as follows:
> >
> > import java.util.Properties
> >
> > import org.apache.flink.table.descriptors.{Json, Kafka, Schema}
> > import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat,
> JDBCOutputFormat}
> > import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer,
> FlinkKafkaConsumerBase, FlinkKafkaProducer, KafkaSerializationSchema}
> > import org.apache.flink.streaming.api.scala._
> > import org.apache.flink.api.scala._
> > import org.apache.flink.api.common.serialization.{SimpleStringEncoder,
> SimpleStringSchema}
> > import org.apache.flink.table.api.scala.StreamTableEnvironment
> > import org.apache.flink.table.api.{EnvironmentSettings, Table,
> TableEnvironment, Types}
> > import org.apache.flink.types.Row
> >
> >val properties =new Properties()
> >properties.setProperty("bootstrap.servers",b_servers)
> >properties.setProperty("zookeeper.connect",zk)
> >properties.setProperty("group.id ",
> "very_small_test")
> >properties.setProperty("ssl.endpoint.identification.algorithm ", "")
> >properties.setProperty("security.protocol", "SSL")
> >
> >
> >val kafkaSource: FlinkKafkaConsumerBase[String] =new
> FlinkKafkaConsumer[String]("very_small_test", new SimpleStringSchema(),
> properties).setStartFromTimestamp(0)
> >
> >val settings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> >val env = StreamExecutionEnvironment.getExecutionEnvironment
> > val tableEnv = StreamTableEnvironment.create(env, settings)
> >
> > val schema =new Schema()
> >  .field("fullVisitorId",Types.STRING)
> >  .field("eventTime",Types.STRING)
> >  .field("eventID",Types.STRING)
> >  .field("eventType",Types.STRING)
> >  .field("page",Types.MAP( Types.STRING, Types.STRING))
> >  .field("CustomDimensions",Types.MAP( Types.STRING, Types.STRING))
> >
> >
> >tableEnv.connect(new Kafka()
> >.version("universal")
> >.topic("very_small_test")
> >.properties(properties)
> >.startFromEarliest()
> >   )
> >  .withFormat(
> >  new Json()
> >.failOnMissingField(false)
> >.deriveSchema()
> >)
> >  .withSchema(schema)
> >  .inAppendMode()
> >  .registerTableSource("sql_source")
> >
> >
> > val sqlStatement ="SELECT * from sql_source where
> CustomDimensions['pagePath'] like '%BT%'"
> >
> > val result =tableEnv.sqlQuery(sqlStatement)
> >
> >val dsRow =tableEnv.toAppendStream[Row](result)
> >
> >
> > val jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
> >  .setDrivername("com.microsoft.sqlserver.jdbc.SQLServerDriver")
> >  .setDBUrl("AWS url")
> > .setUsername(username)
> >  .setPassword(password)
> >  .setQuery("INSERT INTO kafka_data_test (fullVisitorId, EventTime,
> eventID) VALUES
> > (?, ?, ?)")
> >  .setBatchInterval(100)
> >  .finish()
> >
> >dsRow.writeUsingOutputFormat(jdbcOutput)
> >
> > tableEnv.execute("SQL test")
> >
> >
> > --
> >
> > *Best regards
> >
> > Martin Hansen*
> >
>


Re: Adaptive Watermarks Generator

2020-05-22 Thread 杨东晓
Thanks Robert! Could you point out the repository name ? I didn't find it
from  flink-packages.org  Packages list.

Robert Metzger  于2020年5月19日周二 下午5:02写道:

> Hi,
> I haven't seen this paper before. I'm not aware of any efforts in the
> community to integrate this watermark generator.
> I'm also not sure if we would integrate it directory into the main Flink
> repository. What I could see is this generator being maintained in a public
> GitHub repository, linked from flink-packages.org
>
> Best,
> Robert
>
> On Sat, May 9, 2020 at 8:47 AM 杨东晓  wrote:
>
>> Hi , I noticed there is a paper describe about adaptive watermark
>> generator on top of Apache Flink v1.6.2 :
>> https://www.dfki.de/fileadmin/user_upload/import/10260_awad-adaptive-watermarks.pdf
>>
>> This looks like a more precise generator with much less data drop . Does
>> anybody know more detail about this or does Flink community have any plan
>> about this?
>>
>> Thanks!
>>
>


Flink TTL for MapStates and Sideoutputs implementations

2020-05-22 Thread Jaswin Shah
public class CartPGCoprocessFunction extends 
KeyedCoProcessFunction {

private static final Logger logger = 
LoggerFactory.getLogger(CartPGCoprocessFunction.class);

/**
 * Map state for cart messages, orderId+mid is key and cartMessage is value.
 */
private static MapState cartState = null;

/**
 * Map state for pg messages, orderId+mid is key and pgMessage is value.
 */
private static MapState pgState = null;

/**
 * Intializations for cart and pg mapStates
 *
 * @param config
 */
@Override
public void open(Configuration config) {
MapStateDescriptor cartStateDescriptor = new 
MapStateDescriptor<> (
Constants.CART_DATA,
TypeInformation.of(String.class),
TypeInformation.of(CartMessage.class)
);
cartState = getRuntimeContext().getMapState(cartStateDescriptor);

MapStateDescriptor 
pgStateDescriptor = new MapStateDescriptor<>(
Constants.PG_DATA,
TypeInformation.of(String.class),
TypeInformation.of(PaymentNotifyRequestWrapper.class)
);
pgState = getRuntimeContext().getMapState(pgStateDescriptor);
}

/**
 * 1. Get orderId+mid from cartMessage and check in PGMapState if an entry 
is present.
 * 2. If present, match, checkDescripancy, process and delete entry from 
pgMapState.
 * 3. If not present, add orderId+mid as key and cart object as value in 
cartMapState.
 * @param cartMessage
 * @param context
 * @param collector
 * @throws Exception
 */
@Override
public void processElement1(CartMessage cartMessage, Context context, 
Collector collector) throws Exception {
String searchKey = cartMessage.createJoinStringCondition();
PaymentNotifyRequestWrapper paymentNotifyObject = 
pgState.get(searchKey);
if(Objects.nonNull(paymentNotifyObject)) {
generateResultMessage(cartMessage,paymentNotifyObject,collector);
pgState.remove(searchKey);
} else {
cartState.put(searchKey,cartMessage);
}
}

/**
 * 1. Get orderId+mid from pgMessage and check in cartMapState if an entry 
is present.
 * 2. If present, match, checkDescripancy, process and delete entry from 
cartMapState.
 * 3. If not present, add orderId+mid as key and cart object as value in 
pgMapState.
 * @param pgMessage
 * @param context
 * @param collector
 * @throws Exception
 */
@Override
public void processElement2(PaymentNotifyRequestWrapper pgMessage, Context 
context, Collector collector) throws Exception {
String searchKey = pgMessage.createJoinStringCondition();
CartMessage cartMessage = cartState.get(searchKey);
if(Objects.nonNull(cartMessage)) {
generateResultMessage(cartMessage,pgMessage,collector);
cartState.remove(searchKey);
} else {
pgState.put(searchKey,pgMessage);
}
}

/**
 * Create ResultMessage from cart and pg messages.
 *
 * @param cartMessage
 * @param pgMessage
 * @return
 */
private void generateResultMessage(CartMessage cartMessage, 
PaymentNotifyRequestWrapper pgMessage,Collector collector) {
ResultMessage resultMessage = new ResultMessage();
Payment payment = null;

//Logic should be in cart: check
for (Payment pay : cartMessage.getPayments()) {
if (StringUtils.equals(Constants.FORWARD_PAYMENT, 
pay.mapToPaymentTypeInPG()) && StringUtils.equals(Constants.PAYTM_NEW_PROVIDER, 
pay.getProvider())) {
payment = pay;
break;
}
}
if(Objects.isNull(payment)) {
return;
}

resultMessage.setOrderId(cartMessage.getId());
resultMessage.setMid(payment.getMid());


resultMessage.setCartOrderStatus(cartMessage.mapToOrderStatus().getCode());
resultMessage.setPgOrderStatus(pgMessage.getOrderStatus());

resultMessage.setCartOrderCompletionTime(payment.getUpdated_at());
resultMessage.setPgOrderCompletionTime(pgMessage.getCreatedTime());

resultMessage.setPgOrderAmount(pgMessage.getOrderAmount().getValue());

resultMessage.setCartOrderAmount(String.valueOf(cartMessage.getGrandtotal().longValue()));

resultMessage.setCartPaymethod(payment.getPayment_method());

resultMessage.setPgPaymethod(pgMessage.getPaymentView().getPayOptionInfos()[0].getPayMethod());

checkDescripancyAndCollectResult(resultMessage,collector);
}

/**
 * Evaluate if there is descripancy of any fields between the messages from 
two different systems.
 * Write all the descripancy logic here.
 *
 * @param resultMessage
 */
private void checkDescripancyAndCollectResult(ResultMessage resultMessage, 
Collector collector) {

if (!StringUtils.equalsIgnoreCase(resultMessage.getCartOrderStatus(), 

Why Flink Connector JDBC does't support LocalDateTime ?

2020-05-22 Thread forideal
Hello, my friends


  env: Flink 1.10, Blink Planner   
 table source
 CREATE TABLE josn_table ( order_id VARCHAR, event_time TIMESTAMP(3), proc_time 
AS PROCTIME() ) WITH (
'connector.properties.0.key' = 'bootstrap.servers',

'connector.properties.0.value' = 'localhost:9092',
'connector.property-version' = '1',
'connector.startup-mode' = 'earliest-offset',
'connector.topic' = 'raw',
'connector.type' = 'kafka',
'connector.version' = '0.11',
'format.derive-schema' = 'true',
'format.property-version' = '1',
'format.type' = 'json',

'update-mode' = 'append'

)
mysql dim table 
CREATE TABLE ilms_box_d_order ( id VARCHAR, event_timeTIMESTAMP(3)) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://localhost/mydb',
'connector.table' = 'test',
'connector.driver' = 'com.mysql.jdbc.Driver',
'connector.username' = 'test',
'connector.password' = 'test',
'connector.property-version' = '1'
);
DML
INSERT INTO console_test SELECT
t1. event_time,
order_id
FROM
josn_table
LEFT JOIN ilms_box_d_order FOR SYSTEM_TIME AS OF josn_table.proc_time AS t1 ON 
josn_table.order_id = t1.id and  josn_table.event_time = t1.event_time;
When i exec this sql, i will get the follewing exception.
Caused by: java.lang.ClassCastException: java.time.LocalDateTime cannot be 
cast to java.sql.Timestamp, field index: 1, field value: 2020-05-22T14:00.
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcInputFormat.java#L236


Why don't we support LocalDateTime? 


Best wishes.
forideal







Flink 1.10 本机模式(DefaultActionSuspension.resume()是怎么被调用的,想知道这个调用的过程)

2020-05-22 Thread  
).Flink 1.10 
本机模式).方法org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.queueChannel
  调用toNotify.complete(null); 直接调用 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.DefaultActionSuspension.resume()).上面resume()是怎么被调用的,想知道这个调用的过程

Re: Writing to SQL server

2020-05-22 Thread Timo Walther

Hi Martin,

usually, this error occurs when people forget to add 
`org.apache.flink.api.scala._` to their imports. It is triggered by the 
Scala macro that the DataStream API uses for extracting types.


Can you try to call `result.toAppendStream[Row]` directly? This should 
work if you import `org.apache.flink.table.api.scala._`.


Maybe this example helps:

https://github.com/apache/flink/blob/release-1.10/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala

Regards,
Timo


On 22.05.20 08:02, Martin Frank Hansen wrote:

Hi,

I am trying to write input from Kafka to a SQL server on AWS, but I have 
difficulties.


I get the following error could not find implicit value for evidence 
parameter of type 
org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.types.Row]

[error]   val dsRow = tableEnv.toAppendStream[Row](result)
[error]                                           ^

Any help is appreciated

I am not sure whether my approach is correct or not but my code is 
as follows:


import java.util.Properties

import org.apache.flink.table.descriptors.{Json, Kafka, Schema}
import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat, JDBCOutputFormat}
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, 
FlinkKafkaConsumerBase, FlinkKafkaProducer, KafkaSerializationSchema}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.scala._
import org.apache.flink.api.common.serialization.{SimpleStringEncoder, 
SimpleStringSchema}
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, Table, 
TableEnvironment, Types}
import org.apache.flink.types.Row

   val properties =new Properties()
   properties.setProperty("bootstrap.servers",b_servers)
   properties.setProperty("zookeeper.connect",zk)
   properties.setProperty("group.id ", "very_small_test")
   properties.setProperty("ssl.endpoint.identification.algorithm ", "")
   properties.setProperty("security.protocol", "SSL")


   val kafkaSource: FlinkKafkaConsumerBase[String] =new 
FlinkKafkaConsumer[String]("very_small_test", new SimpleStringSchema(), 
properties).setStartFromTimestamp(0)

   val settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
   val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env, settings)

val schema =new Schema()
 .field("fullVisitorId",Types.STRING)
 .field("eventTime",Types.STRING)
 .field("eventID",Types.STRING)
 .field("eventType",Types.STRING)
 .field("page",Types.MAP( Types.STRING, Types.STRING))
 .field("CustomDimensions",Types.MAP( Types.STRING, Types.STRING))


   tableEnv.connect(new Kafka()
   .version("universal")
   .topic("very_small_test")
   .properties(properties)
   .startFromEarliest()
  )
 .withFormat(
 new Json()
   .failOnMissingField(false)
   .deriveSchema()
   )
 .withSchema(schema)
 .inAppendMode()
 .registerTableSource("sql_source")


val sqlStatement ="SELECT * from sql_source where CustomDimensions['pagePath'] like 
'%BT%'"

val result =tableEnv.sqlQuery(sqlStatement)

   val dsRow =tableEnv.toAppendStream[Row](result)


val jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
 .setDrivername("com.microsoft.sqlserver.jdbc.SQLServerDriver")
 .setDBUrl("AWS url")
.setUsername(username)
 .setPassword(password)
 .setQuery("INSERT INTO kafka_data_test (fullVisitorId, EventTime, eventID) VALUES 
(?, ?, ?)")

 .setBatchInterval(100)
 .finish()

   dsRow.writeUsingOutputFormat(jdbcOutput)

tableEnv.execute("SQL test")


--

*Best regards

Martin Hansen*





Writing to SQL server

2020-05-22 Thread Martin Frank Hansen
Hi,

I am trying to write input from Kafka to a SQL server on AWS, but I have
difficulties.

I get the following error could not find implicit value for evidence
parameter of type
org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.types.Row]
[error]   val dsRow = tableEnv.toAppendStream[Row](result)
[error]   ^

Any help is appreciated

I am not sure whether my approach is correct or not but my code is
as follows:

import java.util.Properties

import org.apache.flink.table.descriptors.{Json, Kafka, Schema}
import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat, JDBCOutputFormat}
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer,
FlinkKafkaConsumerBase, FlinkKafkaProducer, KafkaSerializationSchema}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.scala._
import org.apache.flink.api.common.serialization.{SimpleStringEncoder,
SimpleStringSchema}
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, Table,
TableEnvironment, Types}
import org.apache.flink.types.Row

val properties = new Properties()
properties.setProperty("bootstrap.servers",b_servers)
properties.setProperty("zookeeper.connect",zk)
properties.setProperty("group.id", "very_small_test")
properties.setProperty("ssl.endpoint.identification.algorithm ", "")
properties.setProperty("security.protocol", "SSL")


val kafkaSource: FlinkKafkaConsumerBase[String] = new FlinkKafkaConsumer[
String]("very_small_test", new SimpleStringSchema(), properties
).setStartFromTimestamp(0)

val settings = EnvironmentSettings.newInstance
().useBlinkPlanner().inStreamingMode().build()
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env, settings)

val schema = new Schema()
.field("fullVisitorId",Types.STRING)
.field("eventTime",Types.STRING)
.field("eventID",Types.STRING)
.field("eventType",Types.STRING)
.field("page",Types.MAP( Types.STRING, Types.STRING))
.field("CustomDimensions",Types.MAP( Types.STRING, Types.STRING))


tableEnv.connect(new Kafka()
.version("universal")
.topic("very_small_test")
.properties(properties)
.startFromEarliest()
)
.withFormat(
new Json()
.failOnMissingField(false)
.deriveSchema()
)
.withSchema(schema)
.inAppendMode()
.registerTableSource("sql_source")


val sqlStatement = "SELECT * from sql_source where
CustomDimensions['pagePath'] like '%BT%'"

val result = tableEnv.sqlQuery(sqlStatement)

val dsRow = tableEnv.toAppendStream[Row](result)


val jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
.setDrivername("com.microsoft.sqlserver.jdbc.SQLServerDriver")
.setDBUrl("AWS url")
.setUsername(username)
.setPassword(password)
.setQuery("INSERT INTO kafka_data_test (fullVisitorId, EventTime, eventID)
VALUES (?, ?, ?)")
.setBatchInterval(100)
.finish()

dsRow.writeUsingOutputFormat(jdbcOutput)

tableEnv.execute("SQL test")


-- 



*Best regardsMartin Hansen*


回复:Performance issue when writing to HDFS

2020-05-22 Thread Yun Gao
Hi Kong,

 Sorry that I'm not expert of Hadoop, but from the logs and Google, It 
seems more likely to be a problem of HDFS side [1] ? Like long-time GC in 
DataNode.

 Also I have found a similar issue from the history mails [2], and the 
conclusion should be similar.

 Best,
 Yun


   [1] 
https://community.cloudera.com/t5/Support-Questions/Solution-for-quot-slow-readprocessor-quot-warnings/td-p/122046
   [2] 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/quot-Slow-ReadProcessor-quot-warnings-when-using-BucketSink-td9427.html



 --原始邮件 --
发件人:Mu Kong 
发送时间:Fri May 22 11:16:32 2020
收件人:user 
主题:Performance issue when writing to HDFS

Hi all,

I have Flink application consuming from Kafka and writing the data to HDFS 
bucketed by event time with BucketingSink.
Sometimes, the the traffic gets high and from the prometheus metrics, it shows 
the writing is not stable.

(getting from flink_taskmanager_job_task_operator_numRecordsOutPerSecond)

The output data on HDFS is also getting delayed. (The records for a certain 
hour bucket are written to HDFS 50 minutes after that hour)

I looked into the log, and find warning regarding the datanode ack, which might 
be related:

DFSClient exception:2020-05-21 10:43:10,432 INFO  
org.apache.hadoop.hdfs.DFSClient  - Exception in 
createBlockOutputStream
java.io.IOException: Got error, status message , ack with firstBadLink as :1004
at 
org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.checkBlockOpStatus(DataTransferProtoUtil.java:140)
at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1478)
at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1380)
at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:558)

 Slow ReadProcessor read fields warning:2020-05-21 10:42:30,509 WARN  
org.apache.hadoop.hdfs.DFSClient  - Slow 
ReadProcessor read fields took 30230ms (threshold=3ms); ack: seqno: 126 
reply: SUCCESS reply: SUCCESS reply: SUCCESS downstreamAckTimeNanos: 372753456 
flag: 0 flag: 0 flag: 0, targets: [DatanodeInfoWithStorage[:1004,DS-833b175e-9848-453d-a222-abf5c05d643e,DISK], 
DatanodeInfoWithStorage[:1004,DS-f998208a-df7b-4c63-9dde-26453ba69559,DISK], 
DatanodeInfoWithStorage[:1004,DS-4baa6ba6-3951-46f7-a843-62a13e3a62f7,DISK]]


We haven't done any tuning for the Flink job regarding writing to HDFS. Is 
there any config or optimization we can try to avoid delay and these warnings?

Thanks in advance!!

Best regards,
Mu

flink-table sink 与sql-cli结合案例

2020-05-22 Thread guaishushu1...@163.com
大佬们,flink-tablesink 实现与sql-cli结合有什么案例吗?



guaishushu1...@163.com
 
From: user-zh-digest-help
Date: 2020-05-21 21:45
To: user-zh
Subject: user-zh Digest 21 May 2020 13:45:23 - Issue 703
 
user-zh Digest 21 May 2020 13:45:23 - Issue 703
 
Topics (messages 3698 through 3702)
 
回�:flink如何正则读�hdfs下的文件
3698 by: jimandlice
 
flink proctime error
3699 by: Á˲»ÆðµÄ¸Ç´Ä±È
3700 by: Benchao Li
3701 by: Jingsong Li
3702 by: Á˲»ÆðµÄ¸Ç´Ä±È
 
Administrivia:
 
-
To post to the list, e-mail: user-zh@flink.apache.org
To unsubscribe, e-mail: user-zh-digest-unsubscr...@flink.apache.org
For additional commands, e-mail: user-zh-digest-h...@flink.apache.org
 
--