自定义partition,使用遇到问题,附代码

2021-02-19 Thread Ye Chen
各位好,我想实现自定义partition,继承了 FlinkKafkaPartitioner 后,使用会报错,简化的代码如下。
//自定义partition
public class customPartitioner extends FlinkKafkaPartitioner {
@Override
public int partition(String record, byte[] key, byte[] value, String 
targetTopic, int[] partitions) {
return 0;
}
}


DataStream stream = 。。。
FlinkKafkaProducer myProducer = new FlinkKafkaProducer<>(
"test_topic",
new SimpleStringSchema(),
properties,
new customPartitioner()
);
stream.addSink(myProducer);


//上面的代码,编辑器中编译FlinkKafkaProducer会报错,【Error:(55, 49) java: 
无法推断org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer<>的类型参数】
//去掉new 
customPartitioner(),不使用自定义partition,FlinkKafkaProducer就不报错,感觉是构造函数对应不上,但是查看构造函数源码有这个构造函数




查看FlinkKafkaProducer源码如下,我上面的写法有问题么?
public FlinkKafkaProducer(
String topicId,
SerializationSchema serializationSchema,
Properties producerConfig,
Optional> customPartitioner) {
this(
topicId,
serializationSchema,
producerConfig,
customPartitioner.orElse(null),
Semantic.AT_LEAST_ONCE,
DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
}





Cep application with Flink

2021-02-19 Thread Люльченко Юрий Николаевич

Hi there.
 
I’m a newby with Flink and I need to develop a CEP application using this 
technology. At least, we are choosing a tool now. So I have a question how 
Flink really fit to my goals.
 
Buisness requirements:
 
There is a mobile application. Data is collected in Kafka topics (which are 
have multi partitions) from it. Number of topics is three. And it is not a 
ready events by user. At first I need to join all data from topics and only 
after this we have a good clean event of user.
 
The order of events by each user is matter.
 
So the rules can be like these: User does action A, then B, and then for some 
period does not action event C. If such a sequence is recived, so the system 
communicate with this user by different chanels. 
 
We don't want to use an extarnal DB, but only Flink states.
 
My questions are:
 
1. Using Kafka as input stream of data and collect an event by user.
 
I think that the order of clean user events will be wrong with this way, 
because topics are not partitioned by user key and only one topic has this 
field. So can I reorder these events by time field of event?
 
2. State of events.
 
Can I query the state using SQL syntax? I don't want iterate all records of 
store to make a communication.
In case described above (A -> B -> x period waiting -> no C -> communication), 
the B event stored in state. If C recived the system cleans B in store. We need 
query the store and get all records B with B.event_time + period_waiting < 
now_time.
Or can the CEP library make this job by pattern?
 
3. May be the solve of these requrements are not correct anougth. But again 
does Flink can help realise this task?
 
Thanks,
Yuri L.

flink sql source kafka sink 到 mysql 遇主健冲突出现append现象

2021-02-19 Thread Yu Wang
kafka 数据格式:
{"building_id":"trRlI9PL","sofa_id":"dJI53xLp","floor_num":10,"occupy_status":1,"start_time":1613802148092,"end_time":1613803144969,"process_time":"2021-02-20
06:39:05.088"}
{"building_id":"trRlI9PL","sofa_id":"dJI53xLp","floor_num":10,"occupy_status":1,"start_time":1613803609813,"end_time":1613803654280,"process_time":"2021-02-20
06:47:34.609"}


*kafka ddl :*
CREATE TABLE washroom_detail (
 building_id STRING,
 sofa_id STRING,
 floor_num INT,
 occupy_status INT,
 start_time BIGINT,
 end_time BIGINT,
 process_time TIMESTAMP,
 occupy_times as concat(date_format(TIMESTAMPADD(hour, 8,
cast(start_time / 1000 as timestamp)), 'HH:mm'), '-',
date_format(TIMESTAMPADD(hour, 8, cast(end_time / 1000 as timestamp)),
'HH:mm')),
 local_date as date_format(cast(start_time / 1000 as
timestamp), '-MM-dd'),
 day_hour as cast(date_format(cast(start_time / 1000 as
timestamp), 'HH') as INT) + 8
) WITH (
 'connector' = 'kafka',
 'topic' = '',
 'properties.bootstrap.servers' = 'xxx',
 'properties.group.id' = '',
 'scan.startup.mode' = 'earliest-offset',
 'format' = 'json'
);


*mysql ddl:*
  create table hour_ddl
(
building_idSTRING,
sofa_id  STRING,
local_date STRING,
`hour`  INT,
floor_num INT,
occupy_frequency INT,
occupy_times STRING,
update_time TIMESTAMP,
process_time TIMESTAMP,
primary key (building_id, sofa_id, local_date, `hour`)
NOT ENFORCED
) with (
  'connector' = 'jdbc',
  'url' = '',
  'table-name' = '',
  'username' = 'x'
  'password' = 'xx'
  )


*flink sql dml:*
INSERT INTO hour_ddl (building_id, sofa_id, local_date, `hour`, floor_num,
occupy_frequency, occupy_times, update_time, process_time)
SELECT a.building_id,
   a.sofa_id,
   a.local_date,
   a.day_hour,
   a.floor_num,
   CAST(a.frequency + IF(b.occupy_frequency IS NULL, 0,
b.occupy_frequency) AS INT),
   concat(if(b.occupy_times IS NULL, '', b.occupy_times),
if(b.occupy_times IS NULL, a.times, concat(',', a.times))),
   NOW(),
   a.process_time
FROM
(SELECT building_id,
sofa_id,
local_date,
day_hour,
floor_num,
count(1) AS frequency,
LISTAGG(occupy_times) AS times,
MAX(process_time) AS process_time,
PROCTIME() AS compute_time
 FROM washroom_detail
 GROUP BY building_id,
  sofa_id,
  local_date,
  day_hour,
  floor_num) a
LEFT JOIN hour_ddl
FOR SYSTEM_TIME AS OF a.compute_time AS b ON a.building_id = b.building_id
AND a.sofa_id = b.sofa_id
AND a.local_date = b.local_date
AND a.day_hour = b.`hour`
WHERE a.process_time > b.process_time
OR b.process_time IS NULL

现象:
当mysql 没有数据时,插入一条记录
occupy_frequencyoccupy_times
  1  15:01-15:03
当主键冲突时
occupy_frequencyoccupy_times
  3  15:01-15:03,15:01-15:03,15:03-15:04
希望应该是
occupy_frequencyoccupy_times
  2  15:01-15:03,15:03-15:04


SqlValidatorException: No match found for function signature prod()

2021-02-19 Thread xiaoyue
我在使用flinksql1.11的udaf时出现SqlValidatorException: No match found for function 
signature prod(),请求大佬帮忙看看_(:з」∠)_

以下是代码:
-
...
  stableEnv.createTemporarySystemFunction("prod", 
ProductAggregateFunction.class);
  Table resultTable = stableEnv.sqlQuery("select pf_id,prod(yldrate+1)-1 as 
yldrate from queryData group by pf_id");
...
-
@FunctionHint(
input = @DataTypeHint("Double"),
output = @DataTypeHint("Double")
)
public class ProductAggregateFunction extends AggregateFunction {


@Override
public Double getValue(Product acc) {
return acc.prod;
}
@Override
public Product createAccumulator() {
return new Product();
}
public void accumulate(Product acc, Double iValue) {
acc.prod *= iValue;
}
public void retract(Product acc, Double iValue) {
acc.prod /= iValue;
}
public void merge(Product acc, Iterable it) {
for (Product p : it) {
accumulate(acc, p.prod);
}
}
public void resetAccumulator(Product acc) {
acc.prod = 1D;
}
}

Re:flink双流join如何确保数据不丢失

2021-02-19 Thread Smile@LETTers
用 left join 或者 full join?这样的话关联不上的数据在区间结束的时候也会被输出,对侧表的字段用 null 填充。目前 
DataStream API 里面 Interval Join 还不支持 outer join,不过 Table API/SQL 
是支持的,参考[1]。[1]. 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#joins
在 2021-02-08 19:05:56,"lxk7...@163.com"  写道:
>
>目前在使用flink进行双流join,多是使用interval join,根据经验值给定时间间隔,那么该如何保证数据不丢失呢?
>如果数据晚于这个时间间隔,那么数据就被丢弃了,而我做的是关于订单的数据,这是不被允许的。
>
>
>lxk7...@163.com


Re: Question

2021-02-19 Thread Abu Bakar Siddiqur Rahman Rocky
Hi,

I read it:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/try-flink/local_installation.html

I can run the code in the UI of Apache Flink that is in the bin file of
Apache Flink. If I run a java code from intellij idea or eclipse, then how
can I connect the code to apache flink UI?

Thank you!

On Fri, Feb 12, 2021 at 11:43 AM Matthias Pohl 
wrote:

> Checkpoints are stored in some DFS storage. The location can be specified
> using state.checkpoints.dir configuration property [1]. You can access the
> state of a savepoint or checkpoint using the State Processor API [2].
>
> Best,
> Matthias
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#directory-structure
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>
> On Fri, Feb 12, 2021 at 5:35 PM Abu Bakar Siddiqur Rahman Rocky <
> bakar121...@gmail.com> wrote:
>
>> Thank you for your reply.
>>
>> Another Question:
>> After Checkpointing, we save our snapshot to a storage. How can we access
>> the storage?
>>
>> is this the source code:
>> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
>>
>> If it is not, could you please provide me the source code to access in
>> the storage where snapshots are saved?
>>
>> Thank you
>>
>>
>>
>>
>> On Fri, Feb 12, 2021 at 2:44 AM Matthias Pohl 
>> wrote:
>>
>>> Hi Abu Bakar Siddiqur Rahman,
>>> Have you had a look at the Flink documentation [1]? It provides
>>> step-by-step instructions on how to run a job (the Flink binaries provide
>>> example jobs under ./examples) using a local standalone cluster. This
>>> should also work on a Mac. You would just need to start the Flink cluster
>>> (./bin/start-cluster.sh) and submit a job using one of the example jars
>>> provided in the binaries (e.g. ./bin/flink run -d
>>> ./examples/streaming/WordCount.jar). You can check the job running in
>>> Flink's web UI being available under http://localhost:8081 if you use
>>> the default configuration provided by the Flink binaries.
>>>
>>> Best,
>>> Matthias
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/try-flink/local_installation.html
>>>
>>> On Thu, Feb 11, 2021 at 3:45 PM Abu Bakar Siddiqur Rahman Rocky <
>>> bakar121...@gmail.com> wrote:
>>>
 Hi,

 Is there anyone who can inform me how I can connect a Java program to
 Apache Flink (in mac)?

 Thank you!

 Regards,
 Abu Bakar Siddiqur Rahman

 On Thu, Feb 11, 2021 at 4:26 AM Abu Bakar Siddiqur Rahman Rocky <
 bakar121...@gmail.com> wrote:

> Hi Chesnay,
>
> Could you please inform me that how can I connect a Java program to
> apache Flink (in mac)?
>
> Thank you!
>
> Regards,
> Abu Bakar Siddiqur Rahman
>
> On Wed, Feb 3, 2021, 3:06 AM Chesnay Schepler 
> wrote:
>
>> Sure.
>>
>>
>> https://github.com/apache/flink/tree/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint
>>
>>
>> https://github.com/apache/flink/tree/master/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper
>>
>> https://github.com/apache/flink/tree/master/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper
>>
>> On 2/3/2021 3:08 AM, Abu Bakar Siddiqur Rahman Rocky wrote:
>>
>> Hi,
>>
>> Is there any source code for the checkpoints, snapshot and zookeeper
>> mechanism?
>>
>> Thank you
>>
>> On Mon, Feb 1, 2021 at 4:23 AM Chesnay Schepler 
>> wrote:
>>
>>> Could you expand a bit on what you mean? Are you referring to
>>> *savepoints*?
>>>
>>> On 1/28/2021 3:24 PM, Abu Bakar Siddiqur Rahman Rocky wrote:
>>>
>>> Hi,
>>>
>>> Is there any library to use and remember the apache flink snapshot?
>>>
>>> Thank you
>>>
>>> --
>>> Regards,
>>> Abu Bakar Siddiqur Rahman
>>>
>>>
>>>
>>>
>>
>> --
>> Regards,
>> Abu Bakar Siddiqur Rahman
>> Graduate Research Student
>> Natural Language Processing Laboratory
>> Centro de Investigacion en Computacion
>> Instituto Politecnico Nacional, Mexico City
>>
>>
>>

 --
 Regards,
 Abu Bakar Siddiqur Rahman


>>
>> --
>> Regards,
>> Abu Bakar Siddiqur Rahman
>>
>>
>
> --
>
> Matthias Pohl | Engineer
>
> Follow us @VervericaData Ververica 
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton
> Wehner
>


-- 
Regards,
Abu Bakar Siddiqur Rahman


Re: Flink problem

2021-02-19 Thread Guanghui Zhang
Can you tell what to do when the record is reported again by userId:001
within 10 minutes, for example buffer it or keep the only one ?

ゞ野蠻遊戲χ  于2021年2月19日周五 下午7:35写道:

> hi all
>
>  For example, if A user message A (uerId: 001) is reported, and no
> record is reported again by userId: 001 within 10 minutes, record A will be
> sent out. How can this be achieved in Flink?
>
> Thanks
> Jiazhi
>
> --
> 发自我的iPhone
>


Re: Flink SQL temporal table join with Hive 报错

2021-02-19 Thread Leonard Xu

> 
>  二,维表有有分区,每个分区仅仅包含当天的数据,没有 primary key
> 
>   这种情况因为要 Join 全部的数据,所以还是需要设置 'streaming-source.partition.include' = 
> 'all',但是还是因为没有 primary Key,所以无法 run。
> 
> 现在就是针对第二种情况,因为Hive的维度表不是我维护的,很多人都在用,所以不能修改去加上 primary key,无法进行 join.

第二种情况,hive表不是streaming读的,相当于是一张静态表,每次都是加载最新的全量,所以配置如下参数即可
  'streaming-source.enable' = 'false',  -- option with default value, 
can be ignored.
  'streaming-source.partition.include' = 'all', -- option with default value, 
can be ignored.
  'lookup.join.cache.ttl' = '12 h’
   'streaming-source.partition.include' = ‘all’  是默认值,也可以不配, 参考【1】
> 
> 
> 还有我看文档现在不支持 event time join, 官网的汇率是按照 process time 
> join,但是如果要回溯昨天的数据的时候,其实就会有问题。
> 
> 我看 FLIP-132 
> 
>  有提到 Event Time semantics, 这是以后回支持的吗?

Kafka connector已经支持了 event time join, 但hive表目前还不支持在上面声明watermark,所以还不支持


祝好,
Leonard
[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/hive/hive_read_write/#temporal-join-the-latest-table

> 
> 
> macia kk mailto:pre...@gmail.com>> 于2021年2月8日周一 下午6:53写道:
> Hi. Leonard
> 
>   麻烦帮忙看下 Flink 邮件里的这个问题,卡了我很久了,谢谢



Re: Flink problem

2021-02-19 Thread Xintong Song
What you're looking for might be Session Window[1].

Thank you~

Xintong Song


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#session-windows

On Fri, Feb 19, 2021 at 7:35 PM ゞ野蠻遊戲χ  wrote:

> hi all
>
>  For example, if A user message A (uerId: 001) is reported, and no
> record is reported again by userId: 001 within 10 minutes, record A will be
> sent out. How can this be achieved in Flink?
>
> Thanks
> Jiazhi
>
> --
> 发自我的iPhone
>


How to pass PROCTIME through an aggregate

2021-02-19 Thread Rex Fenley
Hello,

Using the table api, I have a CREATE DDL which adds a PROCTIME() column and
I need to use it deep (like 10 operators deep) in our plan. However, I
explicitly do not want to use it before that point.

The problem I'm running into is for any group by UDAF I use I then lose the
proctime column. I do not want to group by proctime there (it is unwindowed
at that point) nor do I want to agg over proctime, I just want to pass the
proctime column through until later when I actually use it in a window.

My question is, how do I forward proctime through group-by UDAFs without
interfering with my plan?

Thanks!

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



Re: How do I increase number of db connections of the Flink JDBC Connector?

2021-02-19 Thread Li Peng
Ah got it, thanks!

On Thu, Feb 18, 2021 at 10:53 PM Chesnay Schepler 
wrote:

> Every works uses exactly 1 connection, so in order to increase the
> number of connections you must indeed increase the worker parallelism.
>
> On 2/19/2021 6:51 AM, Li Peng wrote:
> > Hey folks,
> >
> > I'm trying to use flink to write high throughput incoming data to a
> > SQL db using the JDBC Connector as described here:
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/jdbc.html
> > <
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/jdbc.html>
>
> >
> >
> > However, after enabling this, my data consumption rate slowed down to
> > a crawl. After doing some digging, it seemed like the number of
> > connections to the db was very low, almost 1 connection per worker
> > (there are 9 workers). Which seemed like the culprit if Flink was
> > blocking and waiting for a single db connection to do all this work?
> >
> > Is there a way to tell the JDBC connector to use more db connections?
> > Or do I need to specifically increase the parallelism of the connector
> > to beyond the default?
> >
> > And if I need to increase the parallelism, currently my setup is
> > having 9 workers, 1 task slot for each (this cluster is dedicated to
> > one job). That means I have to increase the number of task slots
> > before increasing parallelism, right?
> >
> > My flink version is 1.10.1 and my jdbc connection
> > is flink-connector-jdbc_2.11:1.11.0.
> >
> > Thanks!
> > Li
>
>
>


Re: How is proctime represented?

2021-02-19 Thread Rex Fenley
Thanks yall this is really helpful!

On Fri, Feb 19, 2021 at 2:40 AM Timo Walther  wrote:

> Chesnay is right. The PROCTIME() is lazy evaluated and executed when its
> result is needed as an argument for another expression or function. So
> within the pipeline the column is NULL but when you want to compute
> something e.g. CAST(proctime AS TIMESTAMP(3)) it will be materialized
> into the row. If you want to use ingestion time, you should be able to use:
>
> CREATE TABLE (
>ingest_ts AS CAST(PROCTIME() AS TIMESTAMP(3))
> )
>
> Regards,
> Timo
>
>
> On 19.02.21 10:23, Chesnay Schepler wrote:
> > hmm...I can now see where that uncertainty comes from.
> >
> > My /impression/ is that PROCTIME is not evaluated eagerly, and instead
> > and operators relying on this column generate their own processing
> > timestamp. What throws me off is that I cannot tell how you would tell
> > Flink to store a processing timestamp as is in a row (to essentially
> > create something like ingestion time).
> >
> > I'm looping in Timo to provide some clarity.
> >
> > On 2/19/2021 8:39 AM, Rex Fenley wrote:
> >> Reading the documentation you posted again after posting this
> >> question, it does sound like it's simply a placeholder that only gets
> >> filled in when used by an operator, then again, that's still not
> >> exactly what it says so I only feel 70% confident like that's what is
> >> happening.
> >>
> >> On Thu, Feb 18, 2021 at 10:55 PM Chesnay Schepler  >> > wrote:
> >>
> >> Could you check whether this answers your question?
> >>
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/timely-stream-processing.html#notions-of-time-event-time-and-processing-time
> >> <
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/timely-stream-processing.html#notions-of-time-event-time-and-processing-time
> >
> >>
> >> On 2/19/2021 7:29 AM, Rex Fenley wrote:
> >>> Hello,
> >>>
> >>> When using PROCTIME() in CREATE DDL for a source, is the proctime
> >>> attribute a timestamp generated at the time of row ingestion at
> >>> the source and then forwarded through the graph execution, or is
> >>> proctime attribute a placeholder that says "fill me in with a
> >>> timestamp" once it's being used directly by some operator, by
> >>> some machine?
> >>>
> >>> Thanks!
> >>>
> >>> --
> >>>
> >>> Rex Fenley|Software Engineer - Mobile and Backend
> >>>
> >>>
> >>> Remind.com | BLOG
> >>>  | FOLLOW US
> >>>  | LIKE US
> >>> 
> >>>
> >>
> >>
> >>
> >> --
> >>
> >> Rex Fenley|Software Engineer - Mobile and Backend
> >>
> >>
> >> Remind.com | BLOG  |
> >> FOLLOW US  | LIKE US
> >> 
> >>
> >
>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



Re: State Access Beyond RichCoFlatMapFunction

2021-02-19 Thread Sandeep khanzode
Hello,

Is there an example setup of Queryable State for a Local Embedded Environment?

I am trying to execute Flink programs from within IntelliJ. Any help would be 
appreciated!

Even if not, if there are other examples where QueryableState can be executed 
in a standalone cluster, that would also be good help. Thanks.


> On 10-Feb-2021, at 9:05 AM, Kezhu Wang  wrote:
> 
> (a) It is by design. For keyed state, you can only access state for that key, 
> not others. If you want one value per key, ValueState fits more appropriate 
> that MapState.
> (b) state-processor-api aims to access/create/modify/upgrade offline 
> savepoint but not running state. Queryable state may meet your requirement, 
> but it is not actively developed for a while according to my observation and 
> still beta. 
> 
> Queryable state: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/queryable_state.html
>  
> 
> 
> On February 9, 2021 at 22:09:29, Sandeep khanzode (sand...@shiftright.ai 
> ) wrote:
> 
>> Hello,
>> 
>> I am creating a class that extends RichCoFlatMapFunction. I need to 
>> connect() two streams to basically share the state of one stream in another. 
>> 
>> This is what I do:
>> private transient MapState state;
>> @Override
>> public void open(Configuration parameters) throws Exception {
>> MapStateDescriptor stateDescriptor =
>> new MapStateDescriptor<>(“abc-saved-state",
>> Types.POJO(KeyClass.class), 
>> Types.POJO(ValueClass.class));
>> state = getRuntimeContext().getMapState(stateDescriptor);
>> 
>> This works correctly.
>> 
>> 
>> I have two questions:
>> (a) Whenever I debug, I can only see the current key in the MapState, not 
>> all the possible keys that were created before and saved. Next time, I get a 
>> hit for another key, I will only see the other key and not the rest of 
>> previous keys. Is it by design or am I missing something?
>> 
>> (b) Can I somehow access this state beyond the class that holds the state? 
>> I.e. can I access the state in some other class? If not, can I use the 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html
>>  
>> 
>>  to do this? Is that the correct way to access the running state of one 
>> stream elsewhere in the program without corrupting it?
>> 
>> 
>> Your response will be greatly appreciated. I will be happy to add more 
>> details if required.
>> 
>> Thanks,
>> Sandeep Ramesh Khanzode



Flink problem

2021-02-19 Thread ゞ野蠻遊戲χ
hi all


  For example, if A user message A (uerId: 001) is reported, 
and no record is reported again by userId: 001 within 10 minutes, record A will 
be sent out. How can this be achieved in Flink?


Thanks
Jiazhi



发自我的iPhone

Re:flink on k8s日志时间戳时区问题

2021-02-19 Thread Michael Ran
k8s  设置的
在 2021-02-19 09:37:28,"casel.chen"  写道:
>目前是UTC时区的,怎样才能设置成当地的东8区呢?谢谢!
>
>
>2021-02-19 01:34:21,259 INFO  akka.event.slf4j.Slf4jLogger 
>[] - Slf4jLogger started
>2021-02-19 01:34:22,155 INFO  akka.remote.Remoting 
>[] - Starting remoting
>2021-02-19 01:34:21,259 INFO akka.event.slf4j.Slf4jLogger [] - Slf4jLogger 
>started
>2021-02-19 01:34:22,155 INFO akka.remote.Remoting [] - Starting remoting


Re: flink可以同时对接两套独立kerberos认证的kafka吗

2021-02-19 Thread 占英华
这个不行吧,kerberos本来支持互信的,现在你的环境不支持没法弄


> 在 2021年2月19日,18:33,liwei li  写道:
> 
> 请问flink是否可以从开启了kerberos的kafka接收数据后,发送到另一个Kerberos认证的kafka,源端和目标端的Kerberos相互独立且不能建立互信。
>  非常感谢! liwei li 邮箱:hilili...@gmail.com 签名由 网易邮箱大师 定制


Re: How is proctime represented?

2021-02-19 Thread Timo Walther
Chesnay is right. The PROCTIME() is lazy evaluated and executed when its 
result is needed as an argument for another expression or function. So 
within the pipeline the column is NULL but when you want to compute 
something e.g. CAST(proctime AS TIMESTAMP(3)) it will be materialized 
into the row. If you want to use ingestion time, you should be able to use:


CREATE TABLE (
  ingest_ts AS CAST(PROCTIME() AS TIMESTAMP(3))
)

Regards,
Timo


On 19.02.21 10:23, Chesnay Schepler wrote:

hmm...I can now see where that uncertainty comes from.

My /impression/ is that PROCTIME is not evaluated eagerly, and instead 
and operators relying on this column generate their own processing 
timestamp. What throws me off is that I cannot tell how you would tell 
Flink to store a processing timestamp as is in a row (to essentially 
create something like ingestion time).


I'm looping in Timo to provide some clarity.

On 2/19/2021 8:39 AM, Rex Fenley wrote:
Reading the documentation you posted again after posting this 
question, it does sound like it's simply a placeholder that only gets 
filled in when used by an operator, then again, that's still not 
exactly what it says so I only feel 70% confident like that's what is 
happening.


On Thu, Feb 18, 2021 at 10:55 PM Chesnay Schepler > wrote:


Could you check whether this answers your question?


https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/timely-stream-processing.html#notions-of-time-event-time-and-processing-time



On 2/19/2021 7:29 AM, Rex Fenley wrote:

Hello,

When using PROCTIME() in CREATE DDL for a source, is the proctime
attribute a timestamp generated at the time of row ingestion at
the source and then forwarded through the graph execution, or is
proctime attribute a placeholder that says "fill me in with a
timestamp" once it's being used directly by some operator, by
some machine?

Thanks!

-- 


Rex Fenley|Software Engineer - Mobile and Backend


Remind.com | BLOG
 | FOLLOW US
 | LIKE US






--

Rex Fenley|Software Engineer - Mobile and Backend


Remind.com | BLOG  | 
FOLLOW US  | LIKE US 









flink可以同时对接两套独立kerberos认证的kafka吗

2021-02-19 Thread liwei li
请问flink是否可以从开启了kerberos的kafka接收数据后,发送到另一个Kerberos认证的kafka,源端和目标端的Kerberos相互独立且不能建立互信。
 非常感谢! liwei li 邮箱:hilili...@gmail.com 签名由 网易邮箱大师 定制

Flink SQL时间序列化问题

2021-02-19 Thread guaishushu1...@163.com
Flink-1.12.0 SQL定义timestamp(3)格式出现时间解析问题

CREATE TABLE 
user_log1 (
user_id string,  
 ts TIMESTAMP(3),   
  proc_time as PROCTIME()) WITH (


Caused by: java.io.IOException: Failed to deserialize JSON 
'{"user_id":"1188","ts":"2021-02-19T17:52:20.921Z"}'.
at org.apache.flink.formats.json.JsonRowDataDeserializationSchema


guaishushu1...@163.com


AW: Kafka SQL Connector: dropping events if more partitions then source tasks

2021-02-19 Thread Jan Oelschlegel
If i increase the watermark, the dropped events getting lower. But why is the 
DataStream API Job still running with 12 hours watermark delay?

By the way, I'm using Flink 1.11. It would be nice if someone could give me 
some advice.

Best,
Jan

Von: Jan Oelschlegel 
Gesendet: Donnerstag, 18. Februar 2021 09:51
An: Jan Oelschlegel ; user 

Betreff: AW: Kafka SQL Connector: dropping events if more partitions then 
source tasks

By  using the DataStream API with the same business logic I'm getting no 
dropped events.

Von: Jan Oelschlegel 
mailto:oelschle...@integration-factory.de>>
Gesendet: Mittwoch, 17. Februar 2021 19:18
An: user mailto:user@flink.apache.org>>
Betreff: Kafka SQL Connector: dropping events if more partitions then source 
tasks

Hi,

i have a question regarding FlinkSQL connector for Kafka. I have 3 Kafka 
partitions and 1 Kafka SQL source connector (Parallelism 1). The data within 
the Kafka parttitons are sorted based on a event-time field, which is also my 
event-time in Flink. My Watermark is generated with a delay of 12 hours

WATERMARK FOR eventtime as eventtime - INTERVAL '12' HOUR


But the problem is that I see dropping events due arriving late in Prometheus.  
But with parallelism of 3  there are no drops.

Do I always have to have as much source-tasks as I have Kafka partitions?



Best,
Jan
HINWEIS: Dies ist eine vertrauliche Nachricht und nur für den Adressaten 
bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren oder Dritten 
zugänglich zu machen. Sollten Sie diese Nachricht irrtümlich erhalten haben, 
bitte ich um Ihre Mitteilung per E-Mail oder unter der oben angegebenen 
Telefonnummer.
HINWEIS: Dies ist eine vertrauliche Nachricht und nur für den Adressaten 
bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren oder Dritten 
zugänglich zu machen. Sollten Sie diese Nachricht irrtümlich erhalten haben, 
bitte ich um Ihre Mitteilung per E-Mail oder unter der oben angegebenen 
Telefonnummer.


Re: How is proctime represented?

2021-02-19 Thread Chesnay Schepler

hmm...I can now see where that uncertainty comes from.

My /impression/ is that PROCTIME is not evaluated eagerly, and instead 
and operators relying on this column generate their own processing 
timestamp. What throws me off is that I cannot tell how you would tell 
Flink to store a processing timestamp as is in a row (to essentially 
create something like ingestion time).


I'm looping in Timo to provide some clarity.

On 2/19/2021 8:39 AM, Rex Fenley wrote:
Reading the documentation you posted again after posting this 
question, it does sound like it's simply a placeholder that only gets 
filled in when used by an operator, then again, that's still not 
exactly what it says so I only feel 70% confident like that's what is 
happening.


On Thu, Feb 18, 2021 at 10:55 PM Chesnay Schepler > wrote:


Could you check whether this answers your question?


https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/timely-stream-processing.html#notions-of-time-event-time-and-processing-time



On 2/19/2021 7:29 AM, Rex Fenley wrote:

Hello,

When using PROCTIME() in CREATE DDL for a source, is the proctime
attribute a timestamp generated at the time of row ingestion at
the source and then forwarded through the graph execution, or is
proctime attribute a placeholder that says "fill me in with a
timestamp" once it's being used directly by some operator, by
some machine?

Thanks!

-- 


Rex Fenley|Software Engineer - Mobile and Backend


Remind.com | BLOG
 | FOLLOW US
 | LIKE US






--

Rex Fenley|Software Engineer - Mobile and Backend


Remind.com | BLOG  | 
FOLLOW US  | LIKE US 







Re: Flink 1.11.3 not able to restore with savepoint taken on Flink 1.9.3

2021-02-19 Thread Tzu-Li (Gordon) Tai
Hi,

I'm not aware of any breaking changes in the savepoint formats from 1.9.3 to
1.11.3.

Let's first try to rule out any obvious causes of this:
- Were any data types / classes that were used in state changed across the
restores? Remember that keys types are also written as part of state
snapshots.
- Did you register any Kryo types in the 1.9.3 execution, had changed those
configuration across the restores?
- Was unaligned checkpointing enabled in the 1.11.3 restore?

As of now it's a bit hard to debug this with just an EOFException, as the
corrupted read could have happened anywhere before that point. If it's
possible to reproduce a minimal job of yours that has the same restore
behaviour, that could also help a lot.

Thanks,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


RE: latency related to the checkpointing mode EXACTLY ONCE

2021-02-19 Thread Tan, Min
Many thanks for your quick response.

The config read_commit for the kafka consumers is required by the exactly once 
(EOS)?
No exactly once if we read un committed messages?

Regards,
Min

From: Chesnay Schepler 
Sent: Thursday, February 18, 2021 8:27 PM
To: Tan, Min ; user 
Subject: [External] Re: latency related to the checkpointing mode EXACTLY ONCE

Yes, if you are only reading committed data than it will take least the 
checkpoint interval for the data to be available to downstream consumers.

On 2/18/2021 6:17 PM, Tan, Min wrote:
Hi,

We use the checkpointing mode EXACTLY ONCE for some of our flink jobs.

I wonder how the checkpoint configurations specially its checkpoint interval 
are related to the end to end latency.

We need to setup read_commit true for the kafak consumers.

Does this lead a latency from one flink job is greater than that of checkpoint 
interval?

Thank you very much for your help in advance.

Min



E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential 
manipulation of contents and/or sender's address, incorrect recipient 
(misdirection), viruses etc. Based on previous e-mail correspondence with you 
and/or an agreement reached with you, UBS considers itself authorized to 
contact you via e-mail. UBS assumes no responsibility for any loss or damage 
resulting from the use of e-mails. 
The recipient is aware of and accepts the inherent risks of using e-mails, in 
particular the risk that the banking relationship and confidential information 
relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are 
protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain 
it, how we keep it secure and your data protection rights, please see our 
Privacy Notice http://www.ubs.com/privacy-statement

Re: Tag flink metrics to job name

2021-02-19 Thread Chesnay Schepler

hmm...in a roundabout way this could be possible I suppose.

For a given job, search through your metrics for some job metric (like 
numRestarts on the JM, or any task metric for TMs), and from that you 
should be able to infer the JM/TM that belongs to that (based on the TM 
ID / host information in the metric).
Conversely, when you see high cpu usage in one of the metrics for a 
JM/TM, search for a job metric for that same process.


On 2/19/2021 9:14 AM, bat man wrote:
Is there a way I can look into say for a specific job what’s the cpu 
usage or memory usage of the yarn containers when multiple jobs are 
running on the same cluster.
Also, the issue am trying to resolve is I’m seeing high memory usage 
for one of the containers I want isolate the issue with one job and 
then investigate further.


Thanks,
Hemant

On Fri, 19 Feb 2021 at 12:18 PM, Chesnay Schepler > wrote:


No, Job-/TaskManager metrics cannot be tagged with the job name.
The reason is that this only makes sense for application clusters
(opposed to session clusters), but we don't differentiate between
the two when it comes to metrics.

On 2/19/2021 3:59 AM, bat man wrote:

I meant the Flink jobname. I’m using the below reporter -
||
|metrics.reporter.prom.class:
org.apache.flink.metrics.prometheus.PrometheusReporter|
Is there any way to tag job names to the task and job manager
metrics.

Thanks,
Hemant

On Fri, 19 Feb 2021 at 12:40 AM, Chesnay Schepler
mailto:ches...@apache.org>> wrote:

When you mean "job_name", are you referring to the Prometheus
concept of
jobs, of the one of Flink?

Which of Flink prometheus reporters are you using?

On 2/17/2021 7:37 PM, bat man wrote:
> Hello there,
>
> I am using prometheus to push metrics to prometheus and
then use
> grafana for visualization. There are metrics like
>
- flink_taskmanager_Status_JVM_CPU_Load, 
flink_taskmanager_Status_JVM_CPU_Load, flink_taskmanager_Status_JVM_CPU_Time

> etc which do not gives job_name. It is tied to an instance.
> When running multiple jobs in the same yarn cluster it is
possible
> that different jobs have yarn containers on the same
instance, in this
> case it is very difficult to find out which instance has
high CPU
> load, Memory usage etc.
>
> Is there a way to tag job_name to these metrics so that the
metrics
> could be visualized per job.
>
> Thanks,
> Hemant








Flink 1.11.3 not able to restore with savepoint taken on Flink 1.9.3

2021-02-19 Thread shravan
Hi,

We are trying to upgrade Flink from version 1.9.3 to 1.11.3. As part of the
upgrade testing, we are observing below exception when Flink 1.11.3 tries to
restore from a savepoint taken with Flink 1.9.3.

java.lang.Exception: Exception while creating StreamOperatorStateContext.
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:222)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore operator
state backend for StreamSource_6ae2e79afadd77f926d57cdd7bfa1e1b_(1/8) from
any of the 1 provided restore options.
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:283)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:156)
... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed
when trying to restore operator state backend
at
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86)
at
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createOperatorStateBackend(RocksDBStateBackend.java:552)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:274)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 11 more
Caused by: java.io.EOFException: No more bytes left.
at
org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:79)
at com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355)
at com.esotericsoftware.kryo.io.Input.readInt(Input.java:350)
at
com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeIntField.read(UnsafeCacheFields.java:46)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:151)
at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:37)
at
org.apache.flink.runtime.state.OperatorStateRestoreOperation.deserializeOperatorStateValues(OperatorStateRestoreOperation.java:191)
at
org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:165)
at
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83)
... 15 more



The Flink pipeline and its operators are the same.

On checking in Flink docs, savepoint restore is supported between 1.9.x and
1.11.x versions.

Please provide inputs on how to resolve this issue.

Regards,
Shravan



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Tag flink metrics to job name

2021-02-19 Thread bat man
Is there a way I can look into say for a specific job what’s the cpu usage
or memory usage of the yarn containers when multiple jobs are running on
the same cluster.
Also, the issue am trying to resolve is I’m seeing high memory usage for
one of the containers I want isolate the issue with one job and then
investigate further.

Thanks,
Hemant

On Fri, 19 Feb 2021 at 12:18 PM, Chesnay Schepler 
wrote:

> No, Job-/TaskManager metrics cannot be tagged with the job name.
> The reason is that this only makes sense for application clusters (opposed
> to session clusters), but we don't differentiate between the two when it
> comes to metrics.
>
> On 2/19/2021 3:59 AM, bat man wrote:
>
> I meant the Flink jobname. I’m using the below reporter -
>
>  metrics.reporter.prom.class: 
> org.apache.flink.metrics.prometheus.PrometheusReporter
>
> Is there any way to tag job names to the task and job manager metrics.
>
> Thanks,
> Hemant
>
> On Fri, 19 Feb 2021 at 12:40 AM, Chesnay Schepler 
> wrote:
>
>> When you mean "job_name", are you referring to the Prometheus concept of
>> jobs, of the one of Flink?
>>
>> Which of Flink prometheus reporters are you using?
>>
>> On 2/17/2021 7:37 PM, bat man wrote:
>> > Hello there,
>> >
>> > I am using prometheus to push metrics to prometheus and then use
>> > grafana for visualization. There are metrics like
>> >
>> - flink_taskmanager_Status_JVM_CPU_Load, 
>> flink_taskmanager_Status_JVM_CPU_Load, flink_taskmanager_Status_JVM_CPU_Time
>>
>> > etc which do not gives job_name. It is tied to an instance.
>> > When running multiple jobs in the same yarn cluster it is possible
>> > that different jobs have yarn containers on the same instance, in this
>> > case it is very difficult to find out which instance has high CPU
>> > load, Memory usage etc.
>> >
>> > Is there a way to tag job_name to these metrics so that the metrics
>> > could be visualized per job.
>> >
>> > Thanks,
>> > Hemant
>>
>>
>>
>