Re: State Processor API and existing state

2021-06-28 Thread JING ZHANG
Hi Marco,
> I assume that all the data within the checkpoint are stored within the
given Savepoint. Is that assumption correct?
Yes
> I have not figured out how to correct / augment / fix the state though.
Can somebody please explain?
Please try this way.
1. Load old savepoint file, create Savepoint obj1
2. Read state of operator with UID Y in returned Savepoint obj1 by step1
3. Create `BootstrapTransformation` based on entry point class
`OperatorTransformation`, bootstrap new operator state with dataset
returned by step2, correct or fix old state of operator UID Y in a
`StateBotstrapFunction` or `KeyedStateBootstrapFunction`
4. Load old savepoint file, create Savepoint obj2
5. Drop the old operator with UID Y by calling `removeOperator` in returned
Savepoint obj2 by step4
6. Add a new Operator with UID Y by calling `withOperator` in returned
Savepoint obj2 by step4 , the first parameter is uid (Y), the second
parameter is returned `BootstrapTranformation` by step 3.
7. writes out returned Savepoint obj2 by step7 to a new path

In this way, in new savepoint files, states of operator withUIDs: W,X, Z
are intact, only the state of operator Y is updated.
Detailed about read/write/modify savepoint could be found in document[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/libs/state_processor_api/

Best regards,
JING ZHANG

Marco Villalobos  于2021年6月29日周二 上午6:00写道:

> Let's say that a job has operators with UIDs: W, X, Y, and Z, and uses
> RocksDB as a backend with checkpoint data URI s3://checkpoints"
>
> Then I stop the job with a savepoint at s3://savepoint-1.
>
> I assume that all the data within the checkpoint are stored within the
> given Savepoint. Is that assumption correct?
>
> Then, how can I fix the state in operator with UID Y, but keep all the
> data in the other operators intact?
>
> I know how to bootstrap state with the state-processor API.
> I have not figured out how to correct / augment / fix the state though.
>
> Can somebody please explain?
>


Re: Savepoint failure with operation not found under key

2021-06-28 Thread Rainie Li
Thanks for the context Chesnay.
Yes, I sent both requests to the same JM.

Best regards
Rainie

On Mon, Jun 28, 2021 at 8:33 AM Chesnay Schepler  wrote:

> Ordinarily this happens because the status request is sent to a different
> JM than the one who received the request for creating a savepoint.
> The meta information for such requests is only stored locally on each JM
> and neither distributed to all JMs nor persisted anywhere.
>
> Did you send both requests ( the ones for creating a savepoint and one for
> querying the status) to the same JM?
>
> On 6/26/2021 11:18 PM, Rainie Li wrote:
>
> Hi Flink Community,
>
> I found this error when I tried to create a savepoint for my flink job.
> It's in version 1.9.
> {
>
> "errors": [
> "Operation not found under key:
> org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey@57b9711e"
> ]
> }
>
> Here is error from JM log:
>
> 2021-06-21 06:49:50,195 ERROR 
> org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers$SavepointStatusHandler
>   - Exception occurred in REST handler: Operation not found under key: 
> org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey@56675052
> 2021-06-21 06:50:50,023 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 
> 47281 of job 8de31918b2f2d983eee0d7f988a8f95a expired before completing.
>
> Any idea what could cause it and how to find more debugging info?
>
> Appreciated for any suggestions.
>
> Thanks
>
> Best regard
>
> Rainie
>
>
>


回复: Re: How to convert local to iso 8601 time in flink sql?

2021-06-28 Thread 1095193...@qq.com
Hi Xu
 I use Elasticsearch as Sink. If a timestamp field is sent into Elasticsearch 
without any timezone information, then it will be assumed to be UTC time 
(Coordinated Universal Time)[1]. 

[1] 
https://www.elastic.co/blog/converting-local-time-to-iso-8601-time-in-elasticsearch


1095193...@qq.com
 
发件人: Leonard Xu
发送时间: 2021-06-29 10:49
收件人: 1095193290
抄送: user
主题: Re: How to convert local to iso 8601 time in flink sql?
Hi,

Unfortunately Flink SQL doesn’t support TIMESTAMP WITH TIME ZONE type yet[1], 
maybe the you can try write an UDF to convert the timestamp '2021-06-29 
09:00:00’ field to String(the string representation like 
'2021-06-29T09:00:00+08:00’).

And could you share your scenario about using TIMESTAMP WITH TIME ZONE type?

[1]https://issues.apache.org/jira/browse/FLINK-20869
Best,
Leonard

在 2021年6月29日,09:56,1095193...@qq.com 写道:

Hi community,
Now I have a timestamp field with format '-MM-dd HH:mm:ss', such as 
'2021-06-29 09:00:00'. How to convert this field to iso 8601  time with offset 
, such as '2021-06-29T09:00:00+08:00'? 
Thanks.



1095193...@qq.com



[DISCUSS] Better user experience in the WindowAggregate upon Changelog (contains update message)

2021-06-28 Thread JING ZHANG
When WindowAggregate works upon Changelog which contains update messages,
UPDATE BEFORE message may be dropped as a late message. [1]

In order to handle late UB message, user needs to set *all* the following 3
parameters:

(1) enable late fire by setting

table.exec.emit.late-fire.enabled : true

(2) set per record emit behavior for late records by setting

table.exec.emit.late-fire.delay : 0 s

(3) keep window state for extra time after window is fired by setting

table.exec.emit.allow-lateness : 1 h// 或者table.exec.state.ttl: 1h


The solution has two disadvantages:

(1) Users may not realize that UB messages may be dropped as a late event,
so they will not set related parameters.

(2) When users look for a solution to solve the dropped UB messages
problem, the current solution is a bit inconvenient for them because they
need to set all the 3 parameters. Besides, some configurations have overlap
ability.


Now there are two proposals to simplify the 3 parameters a little.

(1) Users only need set table.exec.emit.allow-lateness (just like the
behavior on Datastream, user only need set allow-lateness), framework could
atom set `table.exec.emit.late-fire.enabled` to true and set
`table.exec.emit.late-fire.delay` to 0s.

And in the later version, we deprecate `table.exec.emit.late-fire.delay`
and `table.exec.emit.late-fire.enabled`.


(2) Users need set `table.exec.emit.late-fire.enabled` to true and set
`table.exec.state.ttl`, framework  could atom set
`table.exec.emit.late-fire.delay` to 0s.

And in the later version, we deprecate `table.exec.emit.late-fire.delay`
and `table.exec.emit.allow-lateness `.


Please let me know what you think about the issue.

Thank you.

[1] https://issues.apache.org/jira/browse/FLINK-22781


Best regards,
JING ZHANG


Re: How to convert local to iso 8601 time in flink sql?

2021-06-28 Thread Leonard Xu
Hi,

Unfortunately Flink SQL doesn’t support TIMESTAMP WITH TIME ZONE type yet[1], 
maybe the you can try write an UDF to convert the timestamp '2021-06-29 
09:00:00’ field to String(the string representation like 
'2021-06-29T09:00:00+08:00’).

And could you share your scenario about using TIMESTAMP WITH TIME ZONE type?

[1]https://issues.apache.org/jira/browse/FLINK-20869 

Best,
Leonard

> 在 2021年6月29日,09:56,1095193...@qq.com 写道:
> 
> Hi community,
> Now I have a timestamp field with format '-MM-dd HH:mm:ss', such as 
> '2021-06-29 09:00:00'. How to convert this field to iso 8601  time with 
> offset , such as '2021-06-29T09:00:00+08:00'? 
> Thanks.
> 
> 1095193...@qq.com 


How to convert local to iso 8601 time in flink sql?

2021-06-28 Thread 1095193...@qq.com
Hi community,
Now I have a timestamp field with format '-MM-dd HH:mm:ss', such as 
'2021-06-29 09:00:00'. How to convert this field to iso 8601  time with offset 
, such as '2021-06-29T09:00:00+08:00'? 
Thanks.



1095193...@qq.com


State Processor API and existing state

2021-06-28 Thread Marco Villalobos
Let's say that a job has operators with UIDs: W, X, Y, and Z, and uses
RocksDB as a backend with checkpoint data URI s3://checkpoints"

Then I stop the job with a savepoint at s3://savepoint-1.

I assume that all the data within the checkpoint are stored within the
given Savepoint. Is that assumption correct?

Then, how can I fix the state in operator with UID Y, but keep all the data
in the other operators intact?

I know how to bootstrap state with the state-processor API.
I have not figured out how to correct / augment / fix the state though.

Can somebody please explain?


Re: Looking for example code

2021-06-28 Thread Thomas Raef
Thanks, I'll check them out.

Thomas J. Raef
Founder, WeWatchYourWebsite.com
http://wewatchyourwebsite.com
tr...@wewatchyourwebsite.com
LinkedIn 
Facebook 



On Mon, Jun 28, 2021 at 11:16 AM Piotr Nowojski 
wrote:

> Have you seen the documents that I linked? Isn't it enough?
>
> First pular link that I posted [4] has some example code. Literally the
> first link inside the second pulsar blog I referenced [5] leads to the
> pulsar connector repository which also has some examples [6].
>
> Piotrek
>
> [6] https://github.com/streamnative/pulsar-flink/
>
> pon., 28 cze 2021 o 17:08 Thomas Raef 
> napisał(a):
>
>> I need it to connect to Pulsar and stream from Pulsar. I could not find
>> any code on how to connect to Pulsar. I've done the WordCount, but I need
>> sample code for how to connect to Pulsar.
>>
>> Thomas J. Raef
>> Founder, WeWatchYourWebsite.com
>> http://wewatchyourwebsite.com
>> tr...@wewatchyourwebsite.com
>> LinkedIn 
>> Facebook 
>>
>>
>>
>> On Mon, Jun 28, 2021 at 8:54 AM Piotr Nowojski 
>> wrote:
>>
>>> Hi,
>>>
>>> We are glad that you want to try out Flink, but if you would like to get
>>> help you need to be a bit more specific. What are you exactly doing, and
>>> what, on which step exactly and how is not working (including logs and/or
>>> error messages) is necessary for someone to help you.
>>>
>>> In terms of how to start, I would suggest starting with running the
>>> WordCount example on your cluster [1]. This one assumes starting a small
>>> cluster on your local machine. If you are interested in different methods
>>> of running Flink cluster check docs -> deployment -> resource providers [2].
>>>
>>> For developing a simple application and running it from your IDE please
>>> take a look at this Fraud Detection with the DataStream API example [3].
>>>
>>> Regarding the Pulsar connector, I don't know much about it, but there
>>> are a couple of resources that I have found via simple search, that might
>>> be helpful [4],[5]
>>>
>>> Piotrek
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/try-flink/local_installation/
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/
>>> [3]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/try-flink/datastream/
>>> [4]
>>> https://flink.apache.org/news/2019/11/25/query-pulsar-streams-using-apache-flink.html
>>> [5] https://flink.apache.org/2021/01/07/pulsar-flink-connector-270.html
>>>
>>> pt., 25 cze 2021 o 12:05 traef 
>>> napisał(a):
>>>
 I'm just starting with Flink. I've been trying all the examples online
 and none of them work.

 I am not a Java programmer but have been programming since 1982.

 I would like example code to read from a Pulsar topic and output to
 another Pulsar topic.

 Pulsar version 2.8.0
 Flink version 1.13.1
 Scala version 2.11

 Thank you in advance.

>>>


Re: Savepoint failure with operation not found under key

2021-06-28 Thread Chesnay Schepler
Ordinarily this happens because the status request is sent to a 
different JM than the one who received the request for creating a savepoint.
The meta information for such requests is only stored locally on each JM 
and neither distributed to all JMs nor persisted anywhere.


Did you send both requests ( the ones for creating a savepoint and one 
for querying the status) to the same JM?


On 6/26/2021 11:18 PM, Rainie Li wrote:

Hi Flink Community,

I found this error when I tried to create a savepoint for my flink 
job. It's in version 1.9.

{
 "errors": [
 "Operation not found under key:
org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey@57b9711e"
 ]
}
Here is error from JM log:
2021-06-21 06:49:50,195 ERROR 
org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers$SavepointStatusHandler
  - Exception occurred in REST handler: Operation not found under key: 
org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey@56675052
2021-06-21 06:50:50,023 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 
47281 of job 8de31918b2f2d983eee0d7f988a8f95a expired before completing.
Any idea what could cause it and how to find more debugging info?
Appreciated for any suggestions.
Thanks
Best regard
Rainie





Re: Yarn Application Crashed?

2021-06-28 Thread Piotr Nowojski
Hi,

You should still be able to get the Flink logs via:

> yarn logs -applicationId application_1623861596410_0010

And it should give you more answers about what has happened.

About the Flink and YARN behaviour, have you seen the documentation? [1]
Especially this part:

> Failed containers (including the JobManager) are replaced by YARN. The
maximum number of JobManager container restarts is configured via
yarn.application-attempts (default 1). The YARN Application will fail once
all attempts are exhausted.

?

Best,
Piotrek

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/yarn/#flink-on-yarn-reference

pon., 28 cze 2021 o 02:26 Thomas Wang  napisał(a):

> Just found some additional info. It looks like one of the EC2 instances
> got terminated at the time the crash happened and this job had 7 Task
> Managers running on that EC2 instance. Now I suspect it's possible
> that when Yarn tried to migrate the Task Managers, there were no idle
> containers as this job was using like 99% of the entire cluster. However in
> that case shouldn't Yarn wait for containers to become available? I'm not
> quite sure how Flink would behave in this case. Could someone provide some
> insights here? Thanks.
>
> Thomas
>
> On Sun, Jun 27, 2021 at 4:24 PM Thomas Wang  wrote:
>
>> Hi,
>>
>> I recently experienced a job crash due to the underlying Yarn application
>> failing for some reason. Here is the only error message I saw. It seems I
>> can no longer see any of the Flink job logs.
>>
>> Application application_1623861596410_0010 failed 1 times (global limit
>> =2; local limit is =1) due to ApplicationMaster for attempt
>> appattempt_1623861596410_0010_01 timed out. Failing the application.
>>
>> I was running the Flink job using the Yarn session mode with the
>> following command.
>>
>> export HADOOP_CLASSPATH=`hadoop classpath` &&
>> /usr/lib/flink/bin/yarn-session.sh -jm 7g -tm 7g -s 4 --detached
>>
>> I didn't have HA setup, but I believe the underlying Yarn application
>> caused the crash because if, for some reason, the Flink job failed, the
>> Yarn application should still survive. Please correct me if this is not the
>> right assumption.
>>
>> My question is how I should find the root cause in this case and what's
>> the recommended way to avoid this going forward?
>>
>> Thanks.
>>
>> Thomas
>>
>


Re: Looking for example code

2021-06-28 Thread Piotr Nowojski
Have you seen the documents that I linked? Isn't it enough?

First pular link that I posted [4] has some example code. Literally the
first link inside the second pulsar blog I referenced [5] leads to the
pulsar connector repository which also has some examples [6].

Piotrek

[6] https://github.com/streamnative/pulsar-flink/

pon., 28 cze 2021 o 17:08 Thomas Raef 
napisał(a):

> I need it to connect to Pulsar and stream from Pulsar. I could not find
> any code on how to connect to Pulsar. I've done the WordCount, but I need
> sample code for how to connect to Pulsar.
>
> Thomas J. Raef
> Founder, WeWatchYourWebsite.com
> http://wewatchyourwebsite.com
> tr...@wewatchyourwebsite.com
> LinkedIn 
> Facebook 
>
>
>
> On Mon, Jun 28, 2021 at 8:54 AM Piotr Nowojski 
> wrote:
>
>> Hi,
>>
>> We are glad that you want to try out Flink, but if you would like to get
>> help you need to be a bit more specific. What are you exactly doing, and
>> what, on which step exactly and how is not working (including logs and/or
>> error messages) is necessary for someone to help you.
>>
>> In terms of how to start, I would suggest starting with running the
>> WordCount example on your cluster [1]. This one assumes starting a small
>> cluster on your local machine. If you are interested in different methods
>> of running Flink cluster check docs -> deployment -> resource providers [2].
>>
>> For developing a simple application and running it from your IDE please
>> take a look at this Fraud Detection with the DataStream API example [3].
>>
>> Regarding the Pulsar connector, I don't know much about it, but there are
>> a couple of resources that I have found via simple search, that might
>> be helpful [4],[5]
>>
>> Piotrek
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/try-flink/local_installation/
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/
>> [3]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/try-flink/datastream/
>> [4]
>> https://flink.apache.org/news/2019/11/25/query-pulsar-streams-using-apache-flink.html
>> [5] https://flink.apache.org/2021/01/07/pulsar-flink-connector-270.html
>>
>> pt., 25 cze 2021 o 12:05 traef  napisał(a):
>>
>>> I'm just starting with Flink. I've been trying all the examples online
>>> and none of them work.
>>>
>>> I am not a Java programmer but have been programming since 1982.
>>>
>>> I would like example code to read from a Pulsar topic and output to
>>> another Pulsar topic.
>>>
>>> Pulsar version 2.8.0
>>> Flink version 1.13.1
>>> Scala version 2.11
>>>
>>> Thank you in advance.
>>>
>>


Re: Looking for example code

2021-06-28 Thread Thomas Raef
I need it to connect to Pulsar and stream from Pulsar. I could not find any
code on how to connect to Pulsar. I've done the WordCount, but I need
sample code for how to connect to Pulsar.

Thomas J. Raef
Founder, WeWatchYourWebsite.com
http://wewatchyourwebsite.com
tr...@wewatchyourwebsite.com
LinkedIn 
Facebook 



On Mon, Jun 28, 2021 at 8:54 AM Piotr Nowojski  wrote:

> Hi,
>
> We are glad that you want to try out Flink, but if you would like to get
> help you need to be a bit more specific. What are you exactly doing, and
> what, on which step exactly and how is not working (including logs and/or
> error messages) is necessary for someone to help you.
>
> In terms of how to start, I would suggest starting with running the
> WordCount example on your cluster [1]. This one assumes starting a small
> cluster on your local machine. If you are interested in different methods
> of running Flink cluster check docs -> deployment -> resource providers [2].
>
> For developing a simple application and running it from your IDE please
> take a look at this Fraud Detection with the DataStream API example [3].
>
> Regarding the Pulsar connector, I don't know much about it, but there are
> a couple of resources that I have found via simple search, that might
> be helpful [4],[5]
>
> Piotrek
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/try-flink/local_installation/
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/
> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/try-flink/datastream/
> [4]
> https://flink.apache.org/news/2019/11/25/query-pulsar-streams-using-apache-flink.html
> [5] https://flink.apache.org/2021/01/07/pulsar-flink-connector-270.html
>
> pt., 25 cze 2021 o 12:05 traef  napisał(a):
>
>> I'm just starting with Flink. I've been trying all the examples online
>> and none of them work.
>>
>> I am not a Java programmer but have been programming since 1982.
>>
>> I would like example code to read from a Pulsar topic and output to
>> another Pulsar topic.
>>
>> Pulsar version 2.8.0
>> Flink version 1.13.1
>> Scala version 2.11
>>
>> Thank you in advance.
>>
>


Re: Cancel job error ! Interrupted while waiting for buffer

2021-06-28 Thread Piotr Nowojski
Hi,

It's hard to say from the log fragment, but I presume this task has
correctly switched to "CANCELLED" state and this error should not have been
logged as an ERROR, right? How did you get this stack trace? Maybe it was
logged as a DEBUG message? If not, that would be probably a minor bug in
Flink and can you post a larger fragment of the log including the stack
trace and the log line that has printed it?

In short, this kind of exception is a normal thing to happen and expected
when cancelling a job. If your code is busy and blocked while being
backpressured (as your FlatMap operation was in this particular case),
interrupting the code is a standard thing that Flink is doing. However it
shouldn't bubble up to the end user exactly for this reason - to not
confuse users.

> some TM cancel  success, some TM become cenceling and the TM  will be
kill by itself  with task.cancellation.timeout = 18

This part is a bit confusing to me. The above interruption should actually
prevent this timeout from kicking in and TM shouldn't be killed. Again can
you post larger part of the TM/JM logs or even better, full TM/JM logs?

best,
Piotrek



sob., 26 cze 2021 o 04:59 SmileSmile  napisał(a):

> Hi
>
>I use Flink 1.12.4 on yarn,  job topology is.  kafka -> source ->
> flatmap -> window 1 min agg -> sink -> kafka.  Checkpoint is enable ,
> checkpoint interval is 20s . When I cancel my job,  some TM cancel
> success, some TM become cenceling and the TM  will be kill by itself  with
> task.cancellation.timeout = 18.  the TM log show that
>
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
> at
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:114)
> ~[flink-dist_2.11-1.12.4.jar:1.12.4]
> at
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:93)
> ~[flink-dist_2.11-1.12.4.jar:1.12.4]
> at
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
> ~[flink-dist_2.11-1.12.4.jar:1.12.4]
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
> ~[flink-dist_2.11-1.12.4.jar:1.12.4]
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
> ~[flink-dist_2.11-1.12.4.jar:1.12.4]
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)
> ~[flink-dist_2.11-1.12.4.jar:1.12.4]
> at
> com.operation.ParseLineOperationForAgg.flatMap(ParseLineOperationForAgg.java:74)
> [testFlink-1.0.jar:?]
> at
> com.operation.ParseLineOperationForAgg.flatMap(ParseLineOperationForAgg.java:29)
> [testFlink-1.0.jar:?]
> at
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47)
> [flink-dist_2.11-1.12.4.jar:1.12.4]
> at
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:112)
> [flink-dist_2.11-1.12.4.jar:1.12.4]
>
> Caused by: java.io.IOException: Interrupted while waiting for buffer
> at
> org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewBufferBuilderFromPool(BufferWritingResultPartition.java:341)
> ~[testFlink-1.0.jar:?]
> at
> org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewUnicastBufferBuilder(BufferWritingResultPartition.java:313)
> ~[testFlink-1.0.jar:?]
> at
> org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.appendUnicastDataForRecordContinuation(BufferWritingResultPartition.java:257)
> ~[testFlink-1.0.jar:?]
> at
> org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.emitRecord(BufferWritingResultPartition.java:149)
> ~[testFlink-1.0.jar:?]
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:104)
> ~[testFlink-1.0.jar:?]
> at
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:54)
> ~[testFlink-1.0.jar:?]
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:101)
> ~[flink-dist_2.11-1.12.4.jar:1.12.4]
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:87)
> ~[flink-dist_2.11-1.12.4.jar:1.12.4]
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:43)
> ~[flink-dist_2.11-1.12.4.jar:1.12.4]
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
> ~[flink-dist_2.11-1.12.4.jar:1.12.4]
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
> ~[flink-dist_2.11-1.12.4.jar:1.12.4]
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)
> ~[flink-dist_2.11-1.12.4.jar:1.12.4]
> at
> com.operation.ExtractLineOperationAgg.flatMap(ExtractLineOperationAgg.java:72)
> ~[testFlin

Re: Looking for example code

2021-06-28 Thread Piotr Nowojski
Hi,

We are glad that you want to try out Flink, but if you would like to get
help you need to be a bit more specific. What are you exactly doing, and
what, on which step exactly and how is not working (including logs and/or
error messages) is necessary for someone to help you.

In terms of how to start, I would suggest starting with running the
WordCount example on your cluster [1]. This one assumes starting a small
cluster on your local machine. If you are interested in different methods
of running Flink cluster check docs -> deployment -> resource providers [2].

For developing a simple application and running it from your IDE please
take a look at this Fraud Detection with the DataStream API example [3].

Regarding the Pulsar connector, I don't know much about it, but there are a
couple of resources that I have found via simple search, that might
be helpful [4],[5]

Piotrek

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/try-flink/local_installation/
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/try-flink/datastream/
[4]
https://flink.apache.org/news/2019/11/25/query-pulsar-streams-using-apache-flink.html
[5] https://flink.apache.org/2021/01/07/pulsar-flink-connector-270.html

pt., 25 cze 2021 o 12:05 traef  napisał(a):

> I'm just starting with Flink. I've been trying all the examples online and
> none of them work.
>
> I am not a Java programmer but have been programming since 1982.
>
> I would like example code to read from a Pulsar topic and output to
> another Pulsar topic.
>
> Pulsar version 2.8.0
> Flink version 1.13.1
> Scala version 2.11
>
> Thank you in advance.
>


Re: FW: Hadoop3 with Flink

2021-06-28 Thread Yangze Guo
Sorry for the belated reply. In 1.12, you just need to make sure that
the HADOOP_CLASSPATH environment variable is set up. For more details,
please refer to [1].

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/yarn/

Best,
Yangze Guo

On Mon, Jun 28, 2021 at 5:11 PM V N, Suchithra (Nokia - IN/Bangalore)
 wrote:
>
> Hi,
>
> Can anyone please share inputs on this?
>
>
>
> Regards,
>
> Suchithra
>
>
>
> From: V N, Suchithra (Nokia - IN/Bangalore)
> Sent: Thursday, June 24, 2021 2:35 PM
> To: user@flink.apache.org
> Subject: Hadoop3 with Flink
>
>
>
> Hello,
>
>
>
> We are using Apache flink 1.12.3 and planning to use Hadoop 3 version. Could 
> you please suggest how to use Hadoop 3 with flink distribution.
>
>
>
> Regards,
>
> Suchithra
>
>
>
>


Re: Event Time Timers in Process functions when time characteristic is ProcessingTime

2021-06-28 Thread JING ZHANG
Hi lenduha,
> Processing time: We can only use processing time windows & timer.
Event time: We are able to use both processing time & event time
windows & timers. Processing time is always wall/clock time depending
on the host running task manager so independent from the event time?

Yes.

> Also Kafka seems to be supporting timestamp/watermark, however is
there any source not supporting timestamp/watermark & possible
solutions to that like a custom watermark generator?

Yes, there are two places in Flink where WatermarkStrategy can be used [1]:
1) directly on sources, like Kafka connector you mentioned.
2) after non-source operation.
The first option is preferable, because it allows sources to exploit
knowledge about shards/partition/splits in the watermarking logic, which is
more accurate.
But if sources does not have such ability, you could use second option.
Please check document [1] for more details about option1 and option2.
Please note, the document[1] is based on version Flink 1.13 and 1.13+.
Please check document[2] if you use old version.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/event_timestamps_watermarks.html

Best regards,
JING ZHANG

Deniz Koçak  于2021年6月28日周一 下午5:06写道:

> Thanks for the info Jing, just to clarify one point, so when we set
> the environment time characteristics to
>
> Processing time: We can only use processing time windows & timer.
> Event time: We are able to use both processing time & event time
> windows & timers. Processing time is always wall/clock time depending
> on the host running task manager so independent from the event time?
>
> Also Kafka seems to be supporting timestamp/watermark, however is
> there any source not supporting timestamp/watermark & possible
> solutions to that like a custom watermark generator?
>
> Thanks,
>
> On Mon, Jun 28, 2021 at 8:25 AM JING ZHANG  wrote:
> >
> > Hi lenduha,
> >
> > > When set the time characteristics to ProcessingTime via
> setStreamTimeCharacteristic(...) call, I cannot see watermarks in the Flink
> UI.
> >
> > The watermark would be swallowed in the case.
> >
> > > can I use Event Time Timers even if I set the
> > time characteristics to ProcessingTime (via
> > ctx.timerService().registerEventTimeTimer(...))?
> >
> > I'm afraid not.  The event time Timers could not work after setting
> ProcessingTime by setStreamTimeCharacteristic,
> > because the watermark should be swallowed in the case, so the event time
> timers would not be triggered.
> >
> > > when I set the time characteristics to Event Time can I use both
> > processing time timers & event time timers without any problem?
> >
> > Event time timers could work, of course. Besides, explicitly using
> processing-time windows and timers works in event-time mode.
> > Please note that In Flink1.12, the default stream time characteristic
> has been changed to EventTime [1].
> >
> > [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/release-notes/flink-1.12/#flip-134-batch-execution-for-the-datastream-api
> >
> > Best regards,
> > JING ZHANG
> >
> > Deniz Koçak  于2021年6月28日周一 下午2:43写道:
> >>
> >> Hi,
> >>
> >> In environment configuration when set the time characteristics to
> >> ProcessingTime via setStreamTimeCharacteristic(...) call, I cannot see
> >> watermarks in the Flink UI. I think this is expected, because
> >> watermarks disabled in the source (using Kafka as source)?
> >>
> >> Another point here is, can I use Event Time Timers even if I set the
> >> time characteristics to ProcessingTime (via
> >> ctx.timerService().registerEventTimeTimer(...))? As far as I
> >> understood from the documentation, Event Time Timers needs watermarks
> >> which sets  the operator time, so I wonder if I can use event time
> >> timers in case of ProcessingTime selected in the environment? Also
> >> when I set the time characteristics to Event Time can I use both
> >> processing time timers & event time timers without any problem?
> >>
> >> Thanks,
>


FW: Hadoop3 with Flink

2021-06-28 Thread V N, Suchithra (Nokia - IN/Bangalore)
Hi,
Can anyone please share inputs on this?

Regards,
Suchithra

From: V N, Suchithra (Nokia - IN/Bangalore)
Sent: Thursday, June 24, 2021 2:35 PM
To: user@flink.apache.org
Subject: Hadoop3 with Flink

Hello,

We are using Apache flink 1.12.3 and planning to use Hadoop 3 version. Could 
you please suggest how to use Hadoop 3 with flink distribution.

Regards,
Suchithra




Re: Event Time Timers in Process functions when time characteristic is ProcessingTime

2021-06-28 Thread Deniz Koçak
Thanks for the info Jing, just to clarify one point, so when we set
the environment time characteristics to

Processing time: We can only use processing time windows & timer.
Event time: We are able to use both processing time & event time
windows & timers. Processing time is always wall/clock time depending
on the host running task manager so independent from the event time?

Also Kafka seems to be supporting timestamp/watermark, however is
there any source not supporting timestamp/watermark & possible
solutions to that like a custom watermark generator?

Thanks,

On Mon, Jun 28, 2021 at 8:25 AM JING ZHANG  wrote:
>
> Hi lenduha,
>
> > When set the time characteristics to ProcessingTime via 
> > setStreamTimeCharacteristic(...) call, I cannot see watermarks in the Flink 
> > UI.
>
> The watermark would be swallowed in the case.
>
> > can I use Event Time Timers even if I set the
> time characteristics to ProcessingTime (via
> ctx.timerService().registerEventTimeTimer(...))?
>
> I'm afraid not.  The event time Timers could not work after setting 
> ProcessingTime by setStreamTimeCharacteristic,
> because the watermark should be swallowed in the case, so the event time 
> timers would not be triggered.
>
> > when I set the time characteristics to Event Time can I use both
> processing time timers & event time timers without any problem?
>
> Event time timers could work, of course. Besides, explicitly using 
> processing-time windows and timers works in event-time mode.
> Please note that In Flink1.12, the default stream time characteristic has 
> been changed to EventTime [1].
>
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/release-notes/flink-1.12/#flip-134-batch-execution-for-the-datastream-api
>
> Best regards,
> JING ZHANG
>
> Deniz Koçak  于2021年6月28日周一 下午2:43写道:
>>
>> Hi,
>>
>> In environment configuration when set the time characteristics to
>> ProcessingTime via setStreamTimeCharacteristic(...) call, I cannot see
>> watermarks in the Flink UI. I think this is expected, because
>> watermarks disabled in the source (using Kafka as source)?
>>
>> Another point here is, can I use Event Time Timers even if I set the
>> time characteristics to ProcessingTime (via
>> ctx.timerService().registerEventTimeTimer(...))? As far as I
>> understood from the documentation, Event Time Timers needs watermarks
>> which sets  the operator time, so I wonder if I can use event time
>> timers in case of ProcessingTime selected in the environment? Also
>> when I set the time characteristics to Event Time can I use both
>> processing time timers & event time timers without any problem?
>>
>> Thanks,


Re: Exception in snapshotState suppresses subsequent checkpoints

2021-06-28 Thread tao xiao
My job is very simple as you can see from the code I pasted. I simply print
out the number to stdout. If you look at the log the number continued to
print out after checkpoint 1 which indicated no back pressure was
happening.  It is very easy to reproduce this if you run the code I
provided in IDE


LOG

[2021-06-26 16:08:52,352] INFO Triggering checkpoint 1 (type=CHECKPOINT) @
1624694932345 for job afde4a82f41e8284cb0bfff20497a5cc.
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator)
[2021-06-26 16:08:52,372] INFO Could not complete snapshot 1 for operator
Source: Custom Source -> Sink: Print to Std. Out (1/1)#0. Failure reason:
Checkpoint was declined.
(org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl)
org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete
snapshot 1 for operator Source: Custom Source -> Sink: Print to Std. Out
(1/1)#0. Failure reason: Checkpoint was declined.
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:241)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:162)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:685)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:606)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:571)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:298)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$9(StreamTask.java:1003)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:993)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:912)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$8(StreamTask.java:880)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
[flink-runtime_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
[flink-runtime_2.11-1.12.1.jar:1.12.1]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_261]
Caused by: org.apache.flink.util.SerializedThrowable: npe
at
com.smartnews.dp.kafka.sample.flink.FromElementsFunctionT.snapshotState(FromElementsFunctionT.java:111)
~[classes/:?]
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:205)
~[flink-streaming

Re: Event Time Timers in Process functions when time characteristic is ProcessingTime

2021-06-28 Thread JING ZHANG
Hi lenduha,

> When set the time characteristics to ProcessingTime via
setStreamTimeCharacteristic(...) call, I cannot see watermarks in the Flink
UI.

The watermark would be swallowed in the case.

> can I use Event Time Timers even if I set the
time characteristics to ProcessingTime (via
ctx.timerService().registerEventTimeTimer(...))?

I'm afraid not.  The event time Timers could not work after setting
ProcessingTime by setStreamTimeCharacteristic,
because the watermark should be swallowed in the case, so the event time
timers would not be triggered.

> when I set the time characteristics to Event Time can I use both
processing time timers & event time timers without any problem?

Event time timers could work, of course. Besides, explicitly using
processing-time windows and timers works in event-time mode.
Please note that In Flink1.12, the default stream time characteristic has
been changed to EventTime [1].

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/release-notes/flink-1.12/#flip-134-batch-execution-for-the-datastream-api

Best regards,
JING ZHANG

Deniz Koçak  于2021年6月28日周一 下午2:43写道:

> Hi,
>
> In environment configuration when set the time characteristics to
> ProcessingTime via setStreamTimeCharacteristic(...) call, I cannot see
> watermarks in the Flink UI. I think this is expected, because
> watermarks disabled in the source (using Kafka as source)?
>
> Another point here is, can I use Event Time Timers even if I set the
> time characteristics to ProcessingTime (via
> ctx.timerService().registerEventTimeTimer(...))? As far as I
> understood from the documentation, Event Time Timers needs watermarks
> which sets  the operator time, so I wonder if I can use event time
> timers in case of ProcessingTime selected in the environment? Also
> when I set the time characteristics to Event Time can I use both
> processing time timers & event time timers without any problem?
>
> Thanks,
>


Re: [Flink SQL] Lookup join hbase problem

2021-06-28 Thread JING ZHANG
Hi houyin, Jark,
Sorry I missed Jark's response before.
> We should support lookup HBase on multiple fields (by Get#setFilter).
Feel free to open issues.
I agree with Jark. It's better to support multiple keys for HBase.
> How can I solve this problem ?
Before Hbase supports multiple keys, we could use the solution mentioned in
my last email as a walk around way.
Because only constant value and field input ref only be parsed as
LookupKey, you could add some operation on t2.city_code, for example update
`AND t1.city_code = t2.city_code` to `AND UPPER(t1.city_code) =
UPPER(t2.city_code)
` or `AND CAST(t1.city_code AS VARCHAR) = CAST(t2.city_code AS VARCHAR) `

Best regards,
JING ZHANG

JING ZHANG  于2021年6月28日周一 下午2:43写道:

> Hi houyin,
> > It maybe because the conditions in where clause, being pushed down as
> a predicate into join clause ?
> Yes, after pushdown, hbase lookupkeys are `rowKey` and `city_code`, which
> trigger above exception.
> > How can I solve this problem ?
> Because only constant value and field input ref only be parsed as
> LookupKey, you could add some operation on t2.city_code, for example
> update `AND t1.city_code = t2.city_code` to `AND UPPER(t1.city_code) =
> UPPER(t2.city_code) ` or `AND CAST(t1.city_code AS VARCHAR) = CAST
> (t2.city_code AS VARCHAR) `
>
> Best regards,
> JING ZHANG
>
> 纳兰清风  于2021年6月28日周一 下午12:48写道:
>
>> Hi,
>>
>>   When I was using hbase table as my lookup table, I got this error:
>>
>> Caused by: java.lang.IllegalArgumentException: Currently, HBase table
>> can only be lookup by single row key.
>> at
>> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
>>
>> My SQL is
>>
>> insert into sink_kafka(user_ucid,city_code,`source`,system_type)
>> SELECT t1.ucid AS user_ucid,
>>  t1.city_code,
>>  t1.`source`,
>>  t1.system_type
>> FROM tmp_ucid_check t1
>> LEFT JOIN dim_hbase_valid_im_commercial_cust_di_cache
>> for SYSTEM_TIME AS OF t1.proctime AS t2
>> ON concat(t1.ucid,'&',t1.city_code) = t2.rowKey
>> WHERE t2.city_code is NOT null
>> AND t1.city_code = t2.city_code;
>>
>> It maybe because the conditions in where clause, being pushed down as  a 
>> predicate
>> into join clause ?
>> How can I solve this problem ?
>>
>> Thank you
>>
>>
>>
>>
>