?????? need help about "incremental checkpoint",Thanks

2020-10-02 Thread ??????
where's the actual path?
I can only get one path from the WEB UI


Is it possible that this error happened in step 5 is due to my code's 
fault?


--  --
??: 
   "??" 
   
<753743...@qq.com;
:2020??10??3??(??) 9:13
??:"David Anderson"https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/checkpoints.html#directory-structure

 at the end of above link,it said:

 $ bin/flink run -s :checkpointMetaDataPath [:runArgs]

 I have tried the above command in previous experiment,but still no luck.
 And why the above official command has " :" after "run -s"?
 I guess " :" not necessary.

 Could you tell me what the right command is to recover(resume) from 
incremental checkpoint(RocksdbStateBackEnd)?

 Much Thanks~!


 --  --
 ??: "??" https://flink.apache.org/zh/community.html for how to join the list.

 Best,
 David

 On Fri, Oct 2, 2020 at 4:45 PM ?? https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html

 Best,
 David

 On Fri, Oct 2, 2020 at 4:07 PM ?? https://paste.ubuntu.com/p/DpTyQKq6Vk/

 

 pom.xml is:

 http://maven.apache.org/POM/4.0.0;
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";
 https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala --
 https://mvnrepository.com/artifact/org.apache.flink/flink-cep 
--
 https://paste.ubuntu.com/p/49HRYXFzR2/

 

 some of the above error is:

 Caused by: java.lang.IllegalStateException: Unexpected state 
handle type, expected: class 
org.apache.flink.runtime.state.KeyGroupsStateHandle, but found: class 
org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle

 

 

 The steps are:

 1.mvn clean scala:compile compile package

 2.nc -lk 

 3.flink run -c wordcount_increstate 
datastream_api-1.0-SNAPSHOT.jar
 Job has been submitted with JobID df6d62a43620f258155b8538ef0ddf1b

 4.input the following conents in nc -lk 

 before
 error
 error
 error
 error

 5.

 flink run -s 
hdfs://Desktop:9000/tmp/flinkck/df6d62a43620f258155b8538ef0ddf1b/chk-22 -c 
StateWordCount datastream_api-1.0-SNAPSHOT.jar

 Then the above error happens.

 

 Please help,Thanks~!


 I have tried to subscried to user@flink.apache.org;

 but no replies.If possible ,send to appleyu...@foxmail.com with 
your valuable replies,thanks.

 

Re: Flink 1.12 cannot handle large schema

2020-10-02 Thread Lian Jiang
Appreciate Arvid for the jira and the workaround. I will monitor the jira
status and retry when the fix is available. I can help test the fix when it
is in a private branch. Thanks. Regards!

On Fri, Oct 2, 2020 at 3:57 AM Arvid Heise  wrote:

> Hi Lian,
>
> Thank you for reporting. It looks like a bug to me and I created a ticket
> [1].
>
> You have two options: wait for the fix or implement the fix yourself (copy
> AvroSerializerSnapshot and use another way to write/read the schema), then
> subclass AvroSerializer to use your snapshot. Of course, we are happy for
> any patch.
>
> [1] https://issues.apache.org/jira/browse/FLINK-19491
>
> On Fri, Oct 2, 2020 at 2:23 AM Lian Jiang  wrote:
>
>> Hi,
>>
>> I am using Flink 1.12 snapshot built on my machine. My job throws an
>> exception when writeUTF a schema from the schema registry.
>>
>> Caused by: java.io.UTFDataFormatException: encoded string too long:
>> 223502 bytes
>> at java.io.DataOutputStream.writeUTF(DataOutputStream.java:364)
>> at java.io.DataOutputStream.writeUTF(DataOutputStream.java:323)
>> at
>> org.apache.flink.formats.avro.typeutils.AvroSerializerSnapshot.writeSnapshot(AvroSerializerSnapshot.java:75)
>> at
>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.writeVersionedSnapshot(TypeSerializerSnapshot.java:153)
>> at
>> org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.writeNestedSerializerSnapshots(NestedSerializersSnapshotDelegate.java:159)
>> at
>> org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.writeSnapshot(CompositeTypeSerializerSnapshot.java:148)
>> at
>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.writeVersionedSnapshot(TypeSerializerSnapshot.java:153)
>> at
>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.write(TypeSerializerSnapshotSerializationUtil.java:138)
>> at
>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:55)
>> at
>> org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentWriterImpl.writeStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:183)
>> at
>> org.apache.flink.runtime.state.KeyedBackendSerializationProxy.write(KeyedBackendSerializationProxy.java:126)
>> at
>> org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:171)
>> at
>> org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:158)
>> at
>> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at
>> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:510)
>> ... 5 common frames omitted
>>
>> According to
>> https://stackoverflow.com/questions/22741556/dataoutputstream-purpose-of-the-encoded-string-too-long-restriction,
>> java.io.DataOutputStream can only handle max length 65535 strings. Due to
>> this issue, my job cannot deserialize the kafka messages. Any idea is
>> highly appreciated!
>>
>>
>> Regards
>> Lian
>>
>>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


-- 

Create your own email signature



Re: FlinkCounterWrapper

2020-10-02 Thread Richard Moorhead
Furthermore, it looks like the rest of the dropwizard wrappers all have the
mutators implemented.

https://issues.apache.org/jira/browse/FLINK-19497

On Fri, Oct 2, 2020 at 2:30 PM Richard Moorhead 
wrote:

> We have a use case wherein counters emitted by flink are decremented after
> being reported. In this way we report only the change in the counter.
> Currently it seems that FlinkCounterWrapper doesnt mutate the wrapped
> counter when either inc or dec is called; would this be a valid improvement?
>
>
>
> https://github.com/apache/flink/blob/master/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkCounterWrapper.java
>


FlinkCounterWrapper

2020-10-02 Thread Richard Moorhead
We have a use case wherein counters emitted by flink are decremented after
being reported. In this way we report only the change in the counter.
Currently it seems that FlinkCounterWrapper doesnt mutate the wrapped
counter when either inc or dec is called; would this be a valid improvement?


https://github.com/apache/flink/blob/master/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkCounterWrapper.java


Re: Flink SQL - can I force the planner to reuse a temporary view to be reused across multiple queries that access different fields?

2020-10-02 Thread Dan Hill
Thanks, Timo and Piotr!

I figured out my issue.  I called env.disableOperatorChaining(); in my
developer mode.  Disabling operator chaining created the redundant joins.



On Mon, Sep 28, 2020 at 6:41 AM Timo Walther  wrote:

> Hi Dan,
>
> unfortunetely, it is very difficult to read you plan? Maybe you can
> share a higher resolution and highlight which part of the pipeline is A,
> B etc. In general, the planner should be smart enough to reuse subplans
> where appropriate. Maybe this is a bug or shortcoming in the optimizer
> rules that we can fix.
>
> Piotr's suggestion would work to "materialize" a part of the plan to
> DataStream API such that this part is a black box for the optimizer and
> read only once. Currently, there is no API for performing this in the
> Table API itself.
>
> Regards,
> Timo
>
> On 28.09.20 15:13, Piotr Nowojski wrote:
> > Hi Dan,
> >
> > Are we talking about Streaming SQL (from the presence of IntervalJoin
> > node I presume so)? Are you using blink planner?
> >
> > I'm not super familiar with the Flink SQL, but my best guess would be
> > that if you would "export" the view "A" as a DataStream, then
> > re-register it as a new table "A2" and use "A2" in your query, it could
> > do the trick. [1]
> > But I might be wrong or there might be a better way to do it (maybe
> > someone else can help here?).
> >
> > Piotrek
> >
> > [1]
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/common.html#integration-with-datastream-and-dataset-api
> >
> > sob., 26 wrz 2020 o 00:02 Dan Hill  > > napisał(a):
> >
> > I have a temporary views, A and B, and I want to output a union like
> > the following:
> > SELECT * FROM ((SELECT ... FROM A) UNION ALL (SELECT ... FROM B JOIN
> > A ...))
> >
> > Since the columns being requested in both parts of the union are
> > different, the planner appears to be separating these out.  A is
> > pretty complex so I want to reuse A.  Here's the graph for A.  A
> > bunch of extra join nodes are introduced.
> >
> > Just A.
> > Screen Shot 2020-09-22 at 11.14.07 PM.png
> >
> > How the planner currently handles the union.  It creates a bunch of
> > inefficient extra join nodes since the columns are slightly
> different.
> > Screen Shot 2020-09-23 at 12.24.59 PM.png
> >
>
>


Re: what's wrong with my pojo when it's used by flink ?Thanks

2020-10-02 Thread Arvid Heise
Hi 大森林,

if you look in the full logs you'll see

3989 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor [] -
class org.apache.flink.test.checkpointing.UserActionLogPOJO does not
contain a getter for field itemId
3999 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor [] -
class org.apache.flink.test.checkpointing.UserActionLogPOJO does not
contain a setter for field itemId
3999 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor [] -
Class class org.apache.flink.test.checkpointing.UserActionLogPOJO cannot be
used as a POJO type because not all fields are valid POJO fields, and must
be processed as GenericType. Please read the Flink documentation on "Data
Types & Serialization" for details of the effect on performance.
Exception in thread "main"
org.apache.flink.api.common.typeutils.CompositeType$InvalidFieldReferenceException:
Cannot reference field by field expression on
GenericTypeField
expressions are only supported on POJO types, tuples, and case classes.
(See the Flink documentation on what is considered a POJO.)
at
org.apache.flink.streaming.util.typeutils.FieldAccessorFactory.getAccessor(FieldAccessorFactory.java:193)
at
org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator.(ComparableAggregator.java:67)
at
org.apache.flink.streaming.api.datastream.KeyedStream.max(KeyedStream.java:809)
at
org.apache.flink.test.checkpointing.Aggregate.main(UnalignedCheckpointITCase.java:701)

The issue is that in your POJO your setter/getter do not match the field
names. It's easiest to let your IDE generate them for you. For example, if
you keep the current field names, you need to add


public String getItemId() {
   return itemId;
}

public void setItemId(String itemId) {
   this.itemId = itemId;
}

public void setPrice(int price) {
   this.price = price;
}

public void setUserId(String userId) {
   this.userId = userId;
}

public int getPrice() {
   return price;
}

public String getUserId() {
   return userId;
}

As you can see, none of your getters/setters is according to Java Beans
specification and you need to add them all.

On Fri, Oct 2, 2020 at 4:39 PM 大森林  wrote:

>
> I want to do an experiment with the operator "aggregate"
>
> My code is:
>
> Aggregate.java
> https://paste.ubuntu.com/p/vvMKqZXt3r/
>
> UserActionLogPOJO.java
> https://paste.ubuntu.com/p/rfszzKbxDC/
>
>
> The error I got is:
>
> Exception in thread "main"
> org.apache.flink.api.common.typeutils.CompositeType$InvalidFieldReferenceException:
> Cannot reference field by field expression on
> GenericTypeField expressions are only supported on POJO
> types, tuples, and case classes. (See the Flink documentation on what is
> considered a POJO.)
> at
> org.apache.flink.streaming.util.typeutils.FieldAccessorFactory.getAccessor(FieldAccessorFactory.java:193)
> at
> org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator.(ComparableAggregator.java:67)
> at
> org.apache.flink.streaming.api.datastream.KeyedStream.max(KeyedStream.java:836)
> at Aggregate.main(Aggregate.java:52)
>
> Process finished with exit code 1
>
> *Could you tell me where I am wrong with UserActionLogPOJO.java?*
>
> *Thanks for your help*
>
>
>

-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Streaming SQL Job Switches to FINISHED before all records processed

2020-10-02 Thread Austin Cawley-Edwards
Hey Till,

Thanks for the notes. Yeah, the docs don't mention anything specific to
this case, not sure if it's an uncommon one. Assigning timestamps on
conversion does solve the issue. I'm happy to take a stab at implementing
the feature if it is indeed missing and you all think it'd be worthwhile. I
think it's definitely a confusing aspect of working w/ the Table &
DataStream APIs together.

Best,
Austin

On Fri, Oct 2, 2020 at 6:05 AM Till Rohrmann  wrote:

> Hi Austin,
>
> yes, it should also work for ingestion time.
>
> I am not entirely sure whether event time is preserved when converting a
> Table into a retract stream. It should be possible and if it is not
> working, then I guess it is a missing feature. But I am sure that @Timo
> Walther  knows more about it. In doubt, you could
> assign a new watermark generator when having obtained the retract stream.
>
> Here is also a link to some information about event time and watermarks
> [1]. Unfortunately, it does not state anything about the direction Table =>
> DataStream.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/time_attributes.html
>
> Cheers,
> Till
>
> On Fri, Oct 2, 2020 at 12:10 AM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> Hey Till,
>>
>> Just a quick question on time characteristics -- this should work for
>> IngestionTime as well, correct? Is there anything special I need to do to
>> have the CsvTableSource/ toRetractStream call to carry through the assigned
>> timestamps, or do I have to re-assign timestamps during the conversion? I'm
>> currently getting the `Record has Long.MIN_VALUE timestamp (= no timestamp
>> marker)` error, though I'm seeing timestamps being assigned if I step
>> through the AutomaticWatermarkContext.
>>
>> Thanks,
>> Austin
>>
>> On Thu, Oct 1, 2020 at 10:52 AM Austin Cawley-Edwards <
>> austin.caw...@gmail.com> wrote:
>>
>>> Perfect, thanks so much Till!
>>>
>>> On Thu, Oct 1, 2020 at 5:13 AM Till Rohrmann 
>>> wrote:
>>>
 Hi Austin,

 I believe that the problem is the processing time window. Unlike for
 event time where we send a MAX_WATERMARK at the end of the stream to
 trigger all remaining windows, this does not happen for processing time
 windows. Hence, if your stream ends and you still have an open processing
 time window, then it will never get triggered.

 The problem should disappear if you use event time or if you process
 unbounded streams which never end.

 Cheers,
 Till

 On Thu, Oct 1, 2020 at 12:01 AM Austin Cawley-Edwards <
 austin.caw...@gmail.com> wrote:

> Hey all,
>
> Thanks for your patience. I've got a small repo that reproduces the
> issue here: https://github.com/austince/flink-1.10-sql-windowing-error
>
> Not sure what I'm doing wrong but it feels silly.
>
> Thanks so much!
> Austin
>
> On Tue, Sep 29, 2020 at 3:48 PM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> Hey Till,
>>
>> Thanks for the reply -- I'll try to see if I can reproduce this in a
>> small repo and share it with you.
>>
>> Best,
>> Austin
>>
>> On Tue, Sep 29, 2020 at 3:58 AM Till Rohrmann 
>> wrote:
>>
>>> Hi Austin,
>>>
>>> could you share with us the exact job you are running (including the
>>> custom window trigger)? This would help us to better understand your
>>> problem.
>>>
>>> I am also pulling in Klou and Timo who might help with the windowing
>>> logic and the Table to DataStream conversion.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Mon, Sep 28, 2020 at 11:49 PM Austin Cawley-Edwards <
>>> austin.caw...@gmail.com> wrote:
>>>
 Hey all,

 I'm not sure if I've missed something in the docs, but I'm having a
 bit of trouble with a streaming SQL job that starts w/ raw SQL queries 
 and
 then transitions to a more traditional streaming job. I'm on Flink 1.10
 using the Blink planner, running locally with no checkpointing.

 The job looks roughly like:

 CSV 1 -->
 CSV 2 -->  SQL Query to Join --> toRetractStream --> keyed time
 window w/ process func & custom trigger --> some other ops
 CSV 3 -->


 When I remove the windowing directly after the `toRetractStream`,
 the records make it to the "some other ops" stage, but with the 
 windowing,
 those operations are sometimes not sent any data. I can also get data 
 sent
 to the downstream operators by putting in a no-op map before the 
 window and
 placing some breakpoints in there to manually slow down processing.


 The logs don't seem to indicate anything went wrong and generally
 look like:

 4819 [Source: Custom File source (1/1)] INFO

Re: Checkpoint dir is not cleaned up after cancel the job with monitoring API

2020-10-02 Thread Eleanore Jin
Thanks a lot for the confirmation.

Eleanore

On Fri, Oct 2, 2020 at 2:42 AM Chesnay Schepler  wrote:

> Yes, the patch call only triggers the cancellation.
> You can check whether it is complete by polling the job status via
> jobs/ and checking whether state is CANCELED.
>
> On 9/27/2020 7:02 PM, Eleanore Jin wrote:
>
> I have noticed this: if I have Thread.sleep(1500); after the patch call
> returned 202, then the directory gets cleaned up, in the meanwhile, it
> shows the job-manager pod is in completed state before getting terminated:
> see screenshot: https://ibb.co/3F8HsvG
>
> So the patch call is async to terminate the job? Is there a way to check
> if cancel is completed? So that the stop tm and jm can be called afterwards?
>
> Thanks a lot!
> Eleanore
>
>
> On Sun, Sep 27, 2020 at 9:37 AM Eleanore Jin 
> wrote:
>
>> Hi Congxian,
>> I am making rest call to get the checkpoint config: curl -X GET \
>>
>> http://localhost:8081/jobs/d2c91a44f23efa2b6a0a89b9f1ca5a3d/checkpoints/config
>>
>> and here is the response:
>> {
>> "mode": "at_least_once",
>> "interval": 3000,
>> "timeout": 1,
>> "min_pause": 1000,
>> "max_concurrent": 1,
>> "externalization": {
>> "enabled": false,
>> "delete_on_cancellation": true
>> },
>> "state_backend": "FsStateBackend"
>> }
>>
>> I uploaded a screenshot of how azure blob storage looks like after the
>> cancel call : https://ibb.co/vY64pMZ
>>
>> Thanks a lot!
>> Eleanore
>>
>> On Sat, Sep 26, 2020 at 11:23 PM Congxian Qiu 
>> wrote:
>>
>>> Hi Eleanore
>>>
>>> What the `CheckpointRetentionPolicy`[1] did you set for your job? if
>>> `ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION` is set, then the
>>> checkpoint will be kept when canceling a job.
>>>
>>> PS the image did not show
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#retained-checkpoints
>>> Best,
>>> Congxian
>>>
>>>
>>> Eleanore Jin  于2020年9月27日周日 下午1:50写道:
>>>
 Hi experts,

 I am running flink 1.10.2 on kubernetes as per job cluster. Checkpoint
 is enabled, with interval 3s, minimumPause 1s, timeout 10s. I'm
 using FsStateBackend, snapshots are persisted to azure blob storage
 (Microsoft cloud storage service).

 Checkpointed state is just source kafka topic offsets, the flink job is
 stateless as it does filter/json transformation.

 The way I am trying to stop the flink job is via monitoring rest api
 mentioned in doc
 

 e.g.
 curl -X PATCH \
   '
 http://localhost:8081/jobs/3c00535c182a3a00258e2f57bc11fb1a?mode=cancel'
 \
   -H 'Content-Type: application/json' \
   -d '{}'

 This call returned successfully with statusCode 202, then I stopped the
 task manager pods and job manager pod.

 According to the doc, the checkpoint should be cleaned up after the job
 is stopped/cancelled.
 What I have observed is, the checkpoint dir is not cleaned up, can you
 please shield some lights on what I did wrong?

 Below shows the checkpoint dir for a cancelled flink job.
 [image: image.png]

 Thanks!
 Eleanore


>


Re: Checkpoint dir is not cleaned up after cancel the job with monitoring API

2020-10-02 Thread Eleanore Jin
Thanks a lot for the confirmation.

Eleanore

On Fri, Oct 2, 2020 at 2:42 AM Chesnay Schepler  wrote:

> Yes, the patch call only triggers the cancellation.
> You can check whether it is complete by polling the job status via
> jobs/ and checking whether state is CANCELED.
>
> On 9/27/2020 7:02 PM, Eleanore Jin wrote:
>
> I have noticed this: if I have Thread.sleep(1500); after the patch call
> returned 202, then the directory gets cleaned up, in the meanwhile, it
> shows the job-manager pod is in completed state before getting terminated:
> see screenshot: https://ibb.co/3F8HsvG
>
> So the patch call is async to terminate the job? Is there a way to check
> if cancel is completed? So that the stop tm and jm can be called afterwards?
>
> Thanks a lot!
> Eleanore
>
>
> On Sun, Sep 27, 2020 at 9:37 AM Eleanore Jin 
> wrote:
>
>> Hi Congxian,
>> I am making rest call to get the checkpoint config: curl -X GET \
>>
>> http://localhost:8081/jobs/d2c91a44f23efa2b6a0a89b9f1ca5a3d/checkpoints/config
>>
>> and here is the response:
>> {
>> "mode": "at_least_once",
>> "interval": 3000,
>> "timeout": 1,
>> "min_pause": 1000,
>> "max_concurrent": 1,
>> "externalization": {
>> "enabled": false,
>> "delete_on_cancellation": true
>> },
>> "state_backend": "FsStateBackend"
>> }
>>
>> I uploaded a screenshot of how azure blob storage looks like after the
>> cancel call : https://ibb.co/vY64pMZ
>>
>> Thanks a lot!
>> Eleanore
>>
>> On Sat, Sep 26, 2020 at 11:23 PM Congxian Qiu 
>> wrote:
>>
>>> Hi Eleanore
>>>
>>> What the `CheckpointRetentionPolicy`[1] did you set for your job? if
>>> `ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION` is set, then the
>>> checkpoint will be kept when canceling a job.
>>>
>>> PS the image did not show
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#retained-checkpoints
>>> Best,
>>> Congxian
>>>
>>>
>>> Eleanore Jin  于2020年9月27日周日 下午1:50写道:
>>>
 Hi experts,

 I am running flink 1.10.2 on kubernetes as per job cluster. Checkpoint
 is enabled, with interval 3s, minimumPause 1s, timeout 10s. I'm
 using FsStateBackend, snapshots are persisted to azure blob storage
 (Microsoft cloud storage service).

 Checkpointed state is just source kafka topic offsets, the flink job is
 stateless as it does filter/json transformation.

 The way I am trying to stop the flink job is via monitoring rest api
 mentioned in doc
 

 e.g.
 curl -X PATCH \
   '
 http://localhost:8081/jobs/3c00535c182a3a00258e2f57bc11fb1a?mode=cancel'
 \
   -H 'Content-Type: application/json' \
   -d '{}'

 This call returned successfully with statusCode 202, then I stopped the
 task manager pods and job manager pod.

 According to the doc, the checkpoint should be cleaned up after the job
 is stopped/cancelled.
 What I have observed is, the checkpoint dir is not cleaned up, can you
 please shield some lights on what I did wrong?

 Below shows the checkpoint dir for a cancelled flink job.
 [image: image.png]

 Thanks!
 Eleanore


>


Re: need help about "incremental checkpoint",Thanks

2020-10-02 Thread David Anderson
If hdfs://Desktop:9000/tmp/flinkck/1de98c1611c134d915d19ded33aeab54/chk-3
was written by the RocksDbStateBackend, then you can use it to recover if
the new job is also using the RocksDbStateBackend. The command would be

$ bin/flink run -s
hdfs://Desktop:9000/tmp/flinkck/1de98c1611c134d915d19ded33aeab54/chk-3  [args]

The ":" character is meant to indicate that you should not use the literal
string "checkpointMetaDataPath", but rather replace that with the actual
path. Do not include the : character.

David

On Fri, Oct 2, 2020 at 5:58 PM 大森林 <753743...@qq.com> wrote:
>
> I have read the official document
>
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/checkpoints.html#directory-structure
>
> at the end of above link,it said:
>
> $ bin/flink run -s :checkpointMetaDataPath [:runArgs]
>
> I have tried the above command in previous experiment,but still no luck.
> And why the above official command has " :" after "run -s"?
> I guess " :" not necessary.
>
> Could you tell me what the right command is to recover(resume) from
incremental checkpoint(RocksdbStateBackEnd)?
>
> Much Thanks~!
>
>
> -- 原始邮件 --
> 发件人: "大森林" ;
> 发送时间: 2020年10月2日(星期五) 晚上11:41
> 收件人: "David Anderson";
> 抄送: "user";
> 主题: 回复: need help about "incremental checkpoint",Thanks
>
> Thanks for your replies~!
>
> Could you tell me what the right command is to recover from checkpoint
 manually using Rocksdb file?
>
> I understand that checkpoint is for automatically recovery,
> but in this experiment I stop it by force(input 4 error in nc -lk ),
> Is there a way to recover from incremental checkpoint manually ( with
RocksdbStateBackend)?
>
> I can only find
hdfs://Desktop:9000/tmp/flinkck/1de98c1611c134d915d19ded33aeab54/chk-3  in
my WEB UI (I guess this is only used for fsStateBackend)
>
> Thanks for your help~!
>
> -- 原始邮件 --
> 发件人: "David Anderson" ;
> 发送时间: 2020年10月2日(星期五) 晚上11:24
> 收件人: "大森林";
> 抄送: "user";
> 主题: Re: need help about "incremental checkpoint",Thanks
>
>> Write in RocksDbStateBackend.
>> Read in FsStateBackend.
>> It's NOT a match.
>
>
> Yes, that is right. Also, this does not work:
>
> Write in FsStateBackend
> Read in RocksDbStateBackend
>
> For questions and support in Chinese, you can use the
user...@flink.apache.org. See the instructions at
https://flink.apache.org/zh/community.html for how to join the list.
>
> Best,
> David
>
> On Fri, Oct 2, 2020 at 4:45 PM 大森林  wrote:
>>
>> Thanks for your replies~!
>>
>> My English is poor ,I have an understanding of your replies:
>>
>> Write in RocksDbStateBackend.
>> Read in FsStateBackend.
>> It's NOT a match.
>> So I'm wrong in step 5?
>> Is my above understanding right?
>>
>> Thanks for your help.
>>
>> -- 原始邮件 --
>> 发件人: "David Anderson" ;
>> 发送时间: 2020年10月2日(星期五) 晚上10:35
>> 收件人: "大森林";
>> 抄送: "user";
>> 主题: Re: need help about "incremental checkpoint",Thanks
>>
>> It looks like you were trying to resume from a checkpoint taken with the
FsStateBackend into a revised version of the job that uses the
RocksDbStateBackend. Switching state backends in this way is not supported:
checkpoints and savepoints are written in a state-backend-specific format,
and can only be read by the same backend that wrote them.
>>
>> It is possible, however, to migrate between state backends using the
State Processor API [1].
>>
>> [1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>>
>> Best,
>> David
>>
>> On Fri, Oct 2, 2020 at 4:07 PM 大森林  wrote:
>>>
>>> I want to do an experiment of"incremental checkpoint"
>>>
>>> my code is:
>>>
>>> https://paste.ubuntu.com/p/DpTyQKq6Vk/
>>>
>>>
>>>
>>> pom.xml is:
>>>
>>> 
>>> http://maven.apache.org/POM/4.0.0;
>>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
>>> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
>>> 4.0.0
>>>
>>> example
>>> datastream_api
>>> 1.0-SNAPSHOT
>>> 
>>> 
>>> 
>>> org.apache.maven.plugins
>>> maven-compiler-plugin
>>> 3.1
>>> 
>>> 1.8
>>> 1.8
>>> 
>>> 
>>>
>>> 
>>> org.scala-tools
>>> maven-scala-plugin
>>> 2.15.2
>>> 
>>> 
>>> 
>>> compile
>>> testCompile
>>> 
>>> 
>>> 
>>> 
>>>
>>>
>>>
>>> 
>>> 
>>>
>>> 
>>>
>>> 
>>> 
>>> org.apache.flink
>>> flink-streaming-scala_2.11
>>> 1.11.1
>>> provided->
>>> 
>>>
>>> ->
>>> org.apache.flink->
>>> flink-streaming-java_2.12->
>>> 1.11.1->
>>> compile–>->
>>> ->
>>>
>>> 
>>> org.apache.flink
>>> flink-clients_2.11
>>> 1.11.1
>>> 
>>>
>>>
>>>
>>> 
>>> org.apache.flink
>>> flink-statebackend-rocksdb_2.11
>>> 1.11.2
>>> test->
>>> 
>>>
>>> 
>>> org.apache.hadoop
>>> hadoop-client
>>> 3.3.0
>>> 
>>>
>>> 
>>> org.apache.flink
>>> flink-core
>>> 1.11.1
>>> 
>>>
>>> ->
>>> org.slf4j->
>>> slf4j-simple->
>>> 1.7.25->
>>> compile->
>>> ->
>>>
>>> 
>>> 
>>> org.apache.flink
>>> flink-cep_2.11
>>> 1.11.1
>>> 
>>>
>>> 
>>> org.apache.flink
>>> flink-cep-scala_2.11
>>> 1.11.1
>>> 
>>>
>>> 

?????? need help about "incremental checkpoint",Thanks

2020-10-02 Thread ??????
Thanks for your replies~!


Could you tell me what the right command is to recover from checkpoint 
manually using Rocksdb file?


I understand that checkpoint is for automatically recovery,
but in this experiment I stop it by force(input 4 error in nc -lk ),
Is there a way to recover from incremental checkpoint manually ( with 
RocksdbStateBackend)?


I can only 
findhdfs://Desktop:9000/tmp/flinkck/1de98c1611c134d915d19ded33aeab54/chk-3in
 my WEB UI(I guess this is only used for fsStateBackend)


Thanks for your help~!


----
??: 
   "David Anderson" 
   
https://flink.apache.org/zh/community.html for how 
to join the list.


Best,
David


On Fri, Oct 2, 2020 at 4:45 PM ?? https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html


Best,
David


On Fri, Oct 2, 2020 at 4:07 PM ?? https://paste.ubuntu.com/p/DpTyQKq6Vk/
 

 
pom.xml is:
 
http://maven.apache.org/POM/4.0.0;
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";
 https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala 
--
 https://mvnrepository.com/artifact/org.apache.flink/flink-cep --
 https://paste.ubuntu.com/p/49HRYXFzR2/
 

 
some of the above error is:
 
Caused by: java.lang.IllegalStateException: Unexpected state handle type, 
expected: class org.apache.flink.runtime.state.KeyGroupsStateHandle, but found: 
class org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle
 

 

 
The steps are:
 
1.mvn clean scala:compile compile package
 
2.nc -lk 
 
3.flink run -c wordcount_increstate datastream_api-1.0-SNAPSHOT.jar
 Job has been submitted with JobID df6d62a43620f258155b8538ef0ddf1b
 
4.input the following conents in nc -lk 
 
before
 error
 error
 error
 error
 
5.
 
flink run -s 
hdfs://Desktop:9000/tmp/flinkck/df6d62a43620f258155b8538ef0ddf1b/chk-22 -c 
StateWordCount datastream_api-1.0-SNAPSHOT.jar
 
Then the above error happens.
 

 
Please help,Thanks~!




I have tried to subscried to user@flink.apache.org;

but no replies.If possible ,send to appleyu...@foxmail.com with your valuable 
replies,thanks.
 


Re: need help about "incremental checkpoint",Thanks

2020-10-02 Thread David Anderson
>
>
> *Write in RocksDbStateBackend.*
> *Read in FsStateBackend.**It's NOT a match.*


Yes, that is right. Also, this does not work:

Write in FsStateBackend
Read in RocksDbStateBackend

For questions and support in Chinese, you can use the
user...@flink.apache.org. See the instructions at
https://flink.apache.org/zh/community.html for how to join the list.

Best,
David

On Fri, Oct 2, 2020 at 4:45 PM 大森林  wrote:

> Thanks for your replies~!
>
> My English is poor ,I have an understanding of your replies:
>
> *Write in RocksDbStateBackend.*
> *Read in FsStateBackend.*
> *It's NOT a match.*
> So I'm wrong in step 5?
> Is my above understanding right?
>
> Thanks for your help.
>
> -- 原始邮件 --
> *发件人:* "David Anderson" ;
> *发送时间:* 2020年10月2日(星期五) 晚上10:35
> *收件人:* "大森林";
> *抄送:* "user";
> *主题:* Re: need help about "incremental checkpoint",Thanks
>
> It looks like you were trying to resume from a checkpoint taken with the
> FsStateBackend into a revised version of the job that uses the
> RocksDbStateBackend. Switching state backends in this way is not supported:
> checkpoints and savepoints are written in a state-backend-specific format,
> and can only be read by the same backend that wrote them.
>
> It is possible, however, to migrate between state backends using the State
> Processor API [1].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>
> Best,
> David
>
> On Fri, Oct 2, 2020 at 4:07 PM 大森林  wrote:
>
>> *I want to do an experiment of"incremental checkpoint"*
>>
>> my code is:
>>
>> https://paste.ubuntu.com/p/DpTyQKq6Vk/
>>
>>
>>
>> pom.xml is:
>>
>> 
>> http://maven.apache.org/POM/4.0.0;
>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
>> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
>> http://maven.apache.org/xsd/maven-4.0.0.xsd;>
>> 4.0.0
>>
>> example
>> datastream_api
>> 1.0-SNAPSHOT
>> 
>> 
>> 
>> org.apache.maven.plugins
>> maven-compiler-plugin
>> 3.1
>> 
>> 1.8
>> 1.8
>> 
>> 
>>
>> 
>> org.scala-tools
>> maven-scala-plugin
>> 2.15.2
>> 
>> 
>> 
>> compile
>> testCompile
>> 
>> 
>> 
>> 
>>
>>
>>
>> 
>> 
>>
>> 
>>
>> 
>> 
>> org.apache.flink
>> flink-streaming-scala_2.11
>> 1.11.1
>> provided->
>> 
>>
>> ->
>> org.apache.flink->
>> flink-streaming-java_2.12->
>> 1.11.1->
>> compile–>->
>> ->
>>
>> 
>> org.apache.flink
>> flink-clients_2.11
>> 1.11.1
>> 
>>
>>
>>
>> 
>> org.apache.flink
>> flink-statebackend-rocksdb_2.11
>> 1.11.2
>> test->
>> 
>>
>> 
>> org.apache.hadoop
>> hadoop-client
>> 3.3.0
>> 
>>
>> 
>> org.apache.flink
>> flink-core
>> 1.11.1
>> 
>>
>> ->
>> org.slf4j->
>> slf4j-simple->
>> 1.7.25->
>> compile->
>> ->
>>
>> 
>> 
>> org.apache.flink
>> flink-cep_2.11
>> 1.11.1
>> 
>>
>> 
>> org.apache.flink
>> flink-cep-scala_2.11
>> 1.11.1
>> 
>>
>> 
>> org.apache.flink
>> flink-scala_2.11
>> 1.11.1
>> 
>>
>>
>>
>> 
>> org.projectlombok
>> lombok
>> 1.18.4
>> provided->
>> 
>>
>> 
>> 
>>
>>
>>
>> the error I got is:
>>
>> https://paste.ubuntu.com/p/49HRYXFzR2/
>>
>>
>>
>> *some of the above error is:*
>>
>> *Caused by: java.lang.IllegalStateException: Unexpected state handle
>> type, expected: class org.apache.flink.runtime.state.KeyGroupsStateHandle,
>> but found: class
>> org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle*
>>
>>
>>
>>
>>
>> The steps are:
>>
>> 1.mvn clean scala:compile compile package
>>
>> 2.nc -lk 
>>
>> 3.flink run -c wordcount_increstate  datastream_api-1.0-SNAPSHOT.jar
>> Job has been submitted with JobID df6d62a43620f258155b8538ef0ddf1b
>>
>> 4.input the following conents in nc -lk 
>>
>> before
>> error
>> error
>> error
>> error
>>
>> 5.
>>
>> flink run -s
>> hdfs://Desktop:9000/tmp/flinkck/df6d62a43620f258155b8538ef0ddf1b/chk-22 -c
>> StateWordCount datastream_api-1.0-SNAPSHOT.jar
>>
>> Then the above error happens.
>>
>>
>>
>> Please help,Thanks~!
>>
>>
>> I have tried to subscried to user@flink.apache.org;
>>
>> but no replies.If possible ,send to appleyu...@foxmail.com with your
>> valuable replies,thanks.
>>
>>
>>
>


?????? need help about "incremental checkpoint",Thanks

2020-10-02 Thread ??????
Thanks for your replies~!


My English is poor ,I have an understanding of your replies:


Write in RocksDbStateBackend.
Read in FsStateBackend.
It's NOT a match.
So I'm wrong in step 5?
Is my above understanding right?


Thanks for your help.


----
??: 
   "David Anderson" 
   
https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html


Best,
David


On Fri, Oct 2, 2020 at 4:07 PM ?? https://paste.ubuntu.com/p/DpTyQKq6Vk/
 

 
pom.xml is:
 
http://maven.apache.org/POM/4.0.0;
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";
 https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala 
--
 https://mvnrepository.com/artifact/org.apache.flink/flink-cep --
 https://paste.ubuntu.com/p/49HRYXFzR2/
 

 
some of the above error is:
 
Caused by: java.lang.IllegalStateException: Unexpected state handle type, 
expected: class org.apache.flink.runtime.state.KeyGroupsStateHandle, but found: 
class org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle
 

 

 
The steps are:
 
1.mvn clean scala:compile compile package
 
2.nc -lk 
 
3.flink run -c wordcount_increstate datastream_api-1.0-SNAPSHOT.jar
 Job has been submitted with JobID df6d62a43620f258155b8538ef0ddf1b
 
4.input the following conents in nc -lk 
 
before
 error
 error
 error
 error
 
5.
 
flink run -s 
hdfs://Desktop:9000/tmp/flinkck/df6d62a43620f258155b8538ef0ddf1b/chk-22 -c 
StateWordCount datastream_api-1.0-SNAPSHOT.jar
 
Then the above error happens.
 

 
Please help,Thanks~!




I have tried to subscried to user@flink.apache.org;

but no replies.If possible ,send to appleyu...@foxmail.com with your valuable 
replies,thanks.
 


what's wrong with my pojo when it's used by flink ?Thanks

2020-10-02 Thread ??????
I want to do an experiment with the operator "aggregate"


My code is:


Aggregate.java

https://paste.ubuntu.com/p/vvMKqZXt3r/


UserActionLogPOJO.java
https://paste.ubuntu.com/p/rfszzKbxDC/




The error I got is:



Exception in thread "main" 
org.apache.flink.api.common.typeutils.CompositeType$InvalidFieldReferenceException:
 Cannot reference field by field expression on 
GenericType

Re: need help about "incremental checkpoint",Thanks

2020-10-02 Thread David Anderson
It looks like you were trying to resume from a checkpoint taken with the
FsStateBackend into a revised version of the job that uses the
RocksDbStateBackend. Switching state backends in this way is not supported:
checkpoints and savepoints are written in a state-backend-specific format,
and can only be read by the same backend that wrote them.

It is possible, however, to migrate between state backends using the State
Processor API [1].

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html

Best,
David

On Fri, Oct 2, 2020 at 4:07 PM 大森林  wrote:

> *I want to do an experiment of"incremental checkpoint"*
>
> my code is:
>
> https://paste.ubuntu.com/p/DpTyQKq6Vk/
>
>
>
> pom.xml is:
>
> 
> http://maven.apache.org/POM/4.0.0;
> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
> http://maven.apache.org/xsd/maven-4.0.0.xsd;>
> 4.0.0
>
> example
> datastream_api
> 1.0-SNAPSHOT
> 
> 
> 
> org.apache.maven.plugins
> maven-compiler-plugin
> 3.1
> 
> 1.8
> 1.8
> 
> 
>
> 
> org.scala-tools
> maven-scala-plugin
> 2.15.2
> 
> 
> 
> compile
> testCompile
> 
> 
> 
> 
>
>
>
> 
> 
>
> 
>
> 
> 
> org.apache.flink
> flink-streaming-scala_2.11
> 1.11.1
> provided->
> 
>
> ->
> org.apache.flink->
> flink-streaming-java_2.12->
> 1.11.1->
> compile–>->
> ->
>
> 
> org.apache.flink
> flink-clients_2.11
> 1.11.1
> 
>
>
>
> 
> org.apache.flink
> flink-statebackend-rocksdb_2.11
> 1.11.2
> test->
> 
>
> 
> org.apache.hadoop
> hadoop-client
> 3.3.0
> 
>
> 
> org.apache.flink
> flink-core
> 1.11.1
> 
>
> ->
> org.slf4j->
> slf4j-simple->
> 1.7.25->
> compile->
> ->
>
> 
> 
> org.apache.flink
> flink-cep_2.11
> 1.11.1
> 
>
> 
> org.apache.flink
> flink-cep-scala_2.11
> 1.11.1
> 
>
> 
> org.apache.flink
> flink-scala_2.11
> 1.11.1
> 
>
>
>
> 
> org.projectlombok
> lombok
> 1.18.4
> provided->
> 
>
> 
> 
>
>
>
> the error I got is:
>
> https://paste.ubuntu.com/p/49HRYXFzR2/
>
>
>
> *some of the above error is:*
>
> *Caused by: java.lang.IllegalStateException: Unexpected state handle type,
> expected: class org.apache.flink.runtime.state.KeyGroupsStateHandle, but
> found: class
> org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle*
>
>
>
>
>
> The steps are:
>
> 1.mvn clean scala:compile compile package
>
> 2.nc -lk 
>
> 3.flink run -c wordcount_increstate  datastream_api-1.0-SNAPSHOT.jar
> Job has been submitted with JobID df6d62a43620f258155b8538ef0ddf1b
>
> 4.input the following conents in nc -lk 
>
> before
> error
> error
> error
> error
>
> 5.
>
> flink run -s
> hdfs://Desktop:9000/tmp/flinkck/df6d62a43620f258155b8538ef0ddf1b/chk-22 -c
> StateWordCount datastream_api-1.0-SNAPSHOT.jar
>
> Then the above error happens.
>
>
>
> Please help,Thanks~!
>
>
> I have tried to subscried to user@flink.apache.org;
>
> but no replies.If possible ,send to appleyu...@foxmail.com with your
> valuable replies,thanks.
>
>
>


need help about "incremental checkpoint",Thanks

2020-10-02 Thread ??????
I want to do an experiment of"incremental checkpoint"
 
my code is:
 
https://paste.ubuntu.com/p/DpTyQKq6Vk/
 

 
pom.xml is:
 
http://maven.apache.org/POM/4.0.0;
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";
 https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala 
--
 https://mvnrepository.com/artifact/org.apache.flink/flink-cep --
 https://paste.ubuntu.com/p/49HRYXFzR2/
 

 
some of the above error is:
 
Caused by: java.lang.IllegalStateException: Unexpected state handle type, 
expected: class org.apache.flink.runtime.state.KeyGroupsStateHandle, but found: 
class org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle
 

 

 
The steps are:
 
1.mvn clean scala:compile compile package
 
2.nc -lk 
 
3.flink run -c wordcount_increstate datastream_api-1.0-SNAPSHOT.jar
 Job has been submitted with JobID df6d62a43620f258155b8538ef0ddf1b
 
4.input the following conents in nc -lk 
 
before
 error
 error
 error
 error
 
5.
 
flink run -s 
hdfs://Desktop:9000/tmp/flinkck/df6d62a43620f258155b8538ef0ddf1b/chk-22 -c 
StateWordCount datastream_api-1.0-SNAPSHOT.jar
 
Then the above error happens.
 

 
Please help,Thanks~!




I have tried to subscried to user@flink.apache.org;

but no replies.If possible ,send to appleyu...@foxmail.com with your valuable 
replies,thanks.
 


Re: Help with Python Stateful Functions Types

2020-10-02 Thread Igal Shilman
Hi Dan,

I'm assuming that you have different Kafka topics, and each topic contains
messages of a single protobuf type.

In that case, you have to specify the mapping between a topic name to it's
Protobuf message type.
To do that, assume that you have a Kafka topic *A* that contains protobuf
messages of type my.namespace.com/IngressMessage*A*, and
a Kafka topic *B* that contains protobuf message of type
my.namespace.com/IngressMessage*B,* then your ingress definition would look
like this:

- ingress:
meta:
  type: statefun.kafka.io/routable-protobuf-ingress
  id: ...
spec:
  ...
  topics:
- topic: A
  typeUrl: com.googleapis/my.namespace.com/IngressMessageA
  targets:
- example/greeter
   - topic: B
typeUrl: com.googleapis/my.namespace.com/IngressMessageB
targets:
- example/greeter

Now, your Python function (example/greeter) will receive a Protobuf Any
type [1]
with its typeUrl set according to the ingress definition, and you can
safely unpack it. [2]

@functions.bind("example/greeter")
def greeter(context, message):
if message.Is(IngressMessageA.DESCRIPTOR):
  ...
   elif message.Is(IngressMessageB.DESCRIPTOR):
  ..



[1]
https://developers.google.com/protocol-buffers/docs/reference/java/com/google/protobuf/Any
[2]
https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-python-walkthrough-example/walkthrough.py#L50,L59

I hope that helps,
Igal.

On Wed, Sep 30, 2020 at 9:10 PM Clements, Danial C <
danial.cleme...@optum.com> wrote:

> Hi,
>
>
>
> I’m trying to work through an example with Flink Stateful Functions in
> Python.  I have a series of custom protobuf messages that I’ve defined but
> I’m struggling with how they should be provided to the runtime so the
> messages in Kafka can be deserialized.  I see,
>
>
>
> type: statefun.kafka.io/routable-protobuf-ingress
>
> id: example/names
>
> in the example, but how can I change that to
> my.namespace.com/IngressMessage?  Do I need to provide the protobuf
> compiled JAR in my Python app?
>
>
>
> Thanks
>
> Dan
>
>
> This e-mail, including attachments, may include confidential and/or
> proprietary information, and may be used only by the person or entity
> to which it is addressed. If the reader of this e-mail is not the intended
> recipient or his or her authorized agent, the reader is hereby notified
> that any dissemination, distribution or copying of this e-mail is
> prohibited. If you have received this e-mail in error, please notify the
> sender by replying to this message and delete this e-mail immediately.
>


大佬们,有遇到Flink cdc 同步MySQL中的数据,MySQL中的数据有变化,Flink SQL中的表没有实时同步到变化,是什么原因呢?

2020-10-02 Thread chegg_work



Re: Flink on k8s

2020-10-02 Thread Arvid Heise
Hi,

you are missing the Hadoop libraries, hence there is no hdfs support.

In Flink 1.10 and earlier, you would simply copy flink-shaded-hadoop-2-uber[1]
into your opt/ folder. However, since Flink 1.11, we recommend to install
Hadoop and point to it with HADOOP_CLASSPATH.

Now, the latter approach does not seem to be terribly meaningful to K8s.
You can still use the old flink-shaded-hadoop-2-uber for now but it's
better to have a different way in the future.

@Andrey Zagrebin  , do you know what's the current
recommended approach? Should we release a light-weight hdfs filesystem to
make it symmetric to the other filesystems?

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/hadoop.html#using-flink-shaded-hadoop-2-uber-jar-for-resolving-dependency-conflicts-legacy

On Wed, Sep 30, 2020 at 9:02 AM superainbower  wrote:

> And I got this error log
>
> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> Hadoop is not in the classpath/dependencies.
> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> Could not find a file system implementation for scheme 'hdfs'. The scheme
> is not directly supported by Flink and no Hadoop file system to support
> this scheme could be loaded. For a full list of supported file systems,
> please see
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.
>
>
> superainbower
> superainbo...@163.com
>
> 
> 签名由 网易邮箱大师  定制
>
> On 09/30/2020 14:48,superainbower
>  wrote:
>
> Hi
> How to configure statebackend when I deploy flink on k8s , I just add
> the following to flink-conf.yaml, but it doesn’t work
>
> state.backend: rocksdb
> state.checkpoints.dir: hdfs://slave2:8020/flink/checkpoints
> state.savepoints.dir: hdfs://slave2:8020/flink/savepoints
> state.backend.incremental: true
>
> superainbower
> superainbo...@163.com
>
> 
> 签名由 网易邮箱大师  定制
>
>

-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: valueState.value throwing null pointer exception

2020-10-02 Thread Arvid Heise
Hi Edward,

you are right to assume that the non-blocking version is the better fit.
You are also correct to assume that kryo just can't handle the underlying
fields.

I'd just go a different way to solve it: add your custom serializer for
PriorityQueue.

There is one [1] for the upcoming(?) Kryo version that you can just copy
and then register in your code [2].

[1]
https://github.com/EsotericSoftware/kryo/commit/a973694dcd13b83e707cfc6c13faa4e812331770
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/custom_serializers.html

On Fri, Oct 2, 2020 at 11:15 AM Colletta, Edward 
wrote:

> Using Flink 1.9.2, Java, FsStateBackend.
>
>
>
> I was getting com.esotericsoftware.kryo.KryoException:
> java.lang.NullPointerException on a value() operation on a ValueState
> variable in a KeyedProcessFunction.
>
> The object stored in state contained 2 PriorityQueue fields and the error
> message indicated these were the culprits.
>
> I assumed I did not need the concurrent version (PriorityBlockingQueue)
> because it was keyed state so only one task could operate on the variable
> at a time.
>
> And I assumed that checkpointing would not access the variable while I was
> updating it because the checkpointing would not see what I was doing
> between the value() and update()
>
> Operations.  Changing to PriorityBlockingQueue fixed the problem.
>
>
>
> Given that, could it be that Kryo just had an easier time with the
> PriorityBlockingQueue underlying fields, or should do we always need to use
> concurrent versions of objects that are stored in state?
>
>
>
>
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Flink对IoTDB的支持

2020-10-02 Thread milan183sansiro
请问社区有无对IoTDB的source或sink的支持计划



Re: Flink 1.12 cannot handle large schema

2020-10-02 Thread Arvid Heise
Hi Lian,

Thank you for reporting. It looks like a bug to me and I created a ticket
[1].

You have two options: wait for the fix or implement the fix yourself (copy
AvroSerializerSnapshot and use another way to write/read the schema), then
subclass AvroSerializer to use your snapshot. Of course, we are happy for
any patch.

[1] https://issues.apache.org/jira/browse/FLINK-19491

On Fri, Oct 2, 2020 at 2:23 AM Lian Jiang  wrote:

> Hi,
>
> I am using Flink 1.12 snapshot built on my machine. My job throws an
> exception when writeUTF a schema from the schema registry.
>
> Caused by: java.io.UTFDataFormatException: encoded string too long: 223502
> bytes
> at java.io.DataOutputStream.writeUTF(DataOutputStream.java:364)
> at java.io.DataOutputStream.writeUTF(DataOutputStream.java:323)
> at
> org.apache.flink.formats.avro.typeutils.AvroSerializerSnapshot.writeSnapshot(AvroSerializerSnapshot.java:75)
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.writeVersionedSnapshot(TypeSerializerSnapshot.java:153)
> at
> org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.writeNestedSerializerSnapshots(NestedSerializersSnapshotDelegate.java:159)
> at
> org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.writeSnapshot(CompositeTypeSerializerSnapshot.java:148)
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.writeVersionedSnapshot(TypeSerializerSnapshot.java:153)
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.write(TypeSerializerSnapshotSerializationUtil.java:138)
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:55)
> at
> org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentWriterImpl.writeStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:183)
> at
> org.apache.flink.runtime.state.KeyedBackendSerializationProxy.write(KeyedBackendSerializationProxy.java:126)
> at
> org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:171)
> at
> org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:158)
> at
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:510)
> ... 5 common frames omitted
>
> According to
> https://stackoverflow.com/questions/22741556/dataoutputstream-purpose-of-the-encoded-string-too-long-restriction,
> java.io.DataOutputStream can only handle max length 65535 strings. Due to
> this issue, my job cannot deserialize the kafka messages. Any idea is
> highly appreciated!
>
>
> Regards
> Lian
>
>

-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Blobserver dying mid-application

2020-10-02 Thread Till Rohrmann
Hi Andreas,

yes two Flink session clusters won't share the same BlobServer.

Is the problem easily reproducible? If yes, then it could be very helpful
to monitor the backlog length as Chesnay suggested.

One more piece of information is that we create a new TCP connection for
every blob we are downloading from the BlobServer. So if a task needs
multiple jars to run, then every jar is downloaded via its own connection.
However, this should happen sequentially.

Cheers,
Till



On Thu, Oct 1, 2020 at 10:34 PM Hailu, Andreas  wrote:

> @Chesnay:
>
> I see. I actually had a separate thread with Robert Metzger ago regarding
> connection-related issues we’re plagued with at higher parallelisms, and
> his guidance lead us to look into our somaxconn config. We increased it
> from 128 to 1024 in early September. We use the same generic JAR for all of
> our apps, so I don’t think JAR size is the cause. Just so I’m clear: when
> you say Flink session cluster – if we have 2 independent Flink applications
>  A & B with JobManagers that just happen to be running on the same
> DataNode, they don’t share Blobservers, right?
>
>
>
> In regard to historical behavior, no, I haven’t seen these Blobserver
> connection problems until after the somaxconn config change. From an app
> perspective, the only way these ones are different is that they’re wide
> rather than deep i.e. large # of jobs to submit instead of a small handful
> of jobs with large amounts of data to process. If we have many jobs to
> submit, as soon as one is complete, we’re trying to submit the next.
>
>
>
> I saw an example today of an application using 10 TaskManagers w/ 2 slots
> with a total 194 jobs to submit with at most 20 running in parallel fail
> with the same error. I’m happy to try increasing both the concurrent
> connections and backlog to 128 and 2048 respectively, but I still can’t
> make sense of how a backlog of 1,000 connections is being met by 10 Task
> Managers/20 connections at worst.
>
>
>
> $ sysctl -a | grep net.core.somaxconn
>
> net.core.somaxconn = 1024
>
>
>
> *// *ah
>
>
>
> *From:* Chesnay Schepler 
> *Sent:* Thursday, October 1, 2020 1:41 PM
> *To:* Hailu, Andreas [Engineering] ; Till
> Rohrmann 
> *Cc:* user@flink.apache.org; Nico Kruber 
> *Subject:* Re: Blobserver dying mid-application
>
>
>
> All jobs running in a Flink session cluster talk to the same blob server.
>
>
>
> The time when tasks are submitted depends on the job; for streaming jobs
> all tasks are deployed when the job starts running; in case of batch jobs
> the submission can be staggered.
>
>
>
> I'm only aware of 2 cases where we transfer data via the blob server;
>
> a) retrieval of jars required for the user code to run  (this is what you
> see in the stack trace)
>
> b) retrieval of TaskInformation, which _should_ only happen if your job is
> quite large, but let's assume it does.
>
>
>
> For a) there should be at most numberOfSlots * numberOfTaskExecutors
> concurrent connections, in the worst case of each slot working on a
> different job, as each would download the jars for their respective job. If
> multiple slots are used for the same job at the same time, then the job jar
> is only retrieved once.
>
>
>
> For b) the limit should also be numberOfSlots * numberOfTaskExecutors; it
> is done once per task, and there are only so many tasks that can run at the
> same time.
>
>
>
> Thus from what I can tell there should be at most 104 (26 task executors *
> 2 slots * 2) concurrent attempts, of which only 54 should land in the
> backlog.
>
>
>
> Did you run into this issue before?
>
> If not, is this application different than your existing applications? Is
> the jar particularly big, jobs particularly short running or more complex
> than others.
>
>
>
> One thing to note is that the backlog relies entirely on OS functionality,
> which can be subject to an upper limit enforced by the OS.
>
> The configured backlog size is just a hint to the OS, but it may ignore
> it; it appears that 128 is not an uncommon upper limit, but maybe there are
> lower settings out there.
>
> You can check this limit via sysctl -a | grep net.core.somaxconn
>
> Maybe this value is set to 0, effectively disabling the backlog?
>
>
>
> It may also be worthwhile to monitor the number of such connections. (netstat
> -ant | grep -c SYN_REC)
>
>
>
> @Nico Do you have any ideas?
>
>
>
> On 10/1/2020 6:26 PM, Hailu, Andreas wrote:
>
> Hi Chesnay, Till, thanks for responding.
>
>
>
> @Chesnay:
>
> Apologies, I said cores when I meant slots J So a total of 26 Task
> managers with 2 slots each for a grand total of 52 parallelism.
>
>
>
> @Till:
>
> For this application, we have a grand total of 78 jobs, with some of them
> demanding more parallelism than others. Each job has multiple operators –
> depending on the size of the data we’re operating on, we could submit 1
> whopper with 52 parallelism, or multiple smaller jobs submitted in parallel
> with a sum of 52 parallelism. When does a 

Re: Streaming SQL Job Switches to FINISHED before all records processed

2020-10-02 Thread Till Rohrmann
Hi Austin,

yes, it should also work for ingestion time.

I am not entirely sure whether event time is preserved when converting a
Table into a retract stream. It should be possible and if it is not
working, then I guess it is a missing feature. But I am sure that @Timo
Walther  knows more about it. In doubt, you could
assign a new watermark generator when having obtained the retract stream.

Here is also a link to some information about event time and watermarks
[1]. Unfortunately, it does not state anything about the direction Table =>
DataStream.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/time_attributes.html

Cheers,
Till

On Fri, Oct 2, 2020 at 12:10 AM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Hey Till,
>
> Just a quick question on time characteristics -- this should work for
> IngestionTime as well, correct? Is there anything special I need to do to
> have the CsvTableSource/ toRetractStream call to carry through the assigned
> timestamps, or do I have to re-assign timestamps during the conversion? I'm
> currently getting the `Record has Long.MIN_VALUE timestamp (= no timestamp
> marker)` error, though I'm seeing timestamps being assigned if I step
> through the AutomaticWatermarkContext.
>
> Thanks,
> Austin
>
> On Thu, Oct 1, 2020 at 10:52 AM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> Perfect, thanks so much Till!
>>
>> On Thu, Oct 1, 2020 at 5:13 AM Till Rohrmann 
>> wrote:
>>
>>> Hi Austin,
>>>
>>> I believe that the problem is the processing time window. Unlike for
>>> event time where we send a MAX_WATERMARK at the end of the stream to
>>> trigger all remaining windows, this does not happen for processing time
>>> windows. Hence, if your stream ends and you still have an open processing
>>> time window, then it will never get triggered.
>>>
>>> The problem should disappear if you use event time or if you process
>>> unbounded streams which never end.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Thu, Oct 1, 2020 at 12:01 AM Austin Cawley-Edwards <
>>> austin.caw...@gmail.com> wrote:
>>>
 Hey all,

 Thanks for your patience. I've got a small repo that reproduces the
 issue here: https://github.com/austince/flink-1.10-sql-windowing-error

 Not sure what I'm doing wrong but it feels silly.

 Thanks so much!
 Austin

 On Tue, Sep 29, 2020 at 3:48 PM Austin Cawley-Edwards <
 austin.caw...@gmail.com> wrote:

> Hey Till,
>
> Thanks for the reply -- I'll try to see if I can reproduce this in a
> small repo and share it with you.
>
> Best,
> Austin
>
> On Tue, Sep 29, 2020 at 3:58 AM Till Rohrmann 
> wrote:
>
>> Hi Austin,
>>
>> could you share with us the exact job you are running (including the
>> custom window trigger)? This would help us to better understand your
>> problem.
>>
>> I am also pulling in Klou and Timo who might help with the windowing
>> logic and the Table to DataStream conversion.
>>
>> Cheers,
>> Till
>>
>> On Mon, Sep 28, 2020 at 11:49 PM Austin Cawley-Edwards <
>> austin.caw...@gmail.com> wrote:
>>
>>> Hey all,
>>>
>>> I'm not sure if I've missed something in the docs, but I'm having a
>>> bit of trouble with a streaming SQL job that starts w/ raw SQL queries 
>>> and
>>> then transitions to a more traditional streaming job. I'm on Flink 1.10
>>> using the Blink planner, running locally with no checkpointing.
>>>
>>> The job looks roughly like:
>>>
>>> CSV 1 -->
>>> CSV 2 -->  SQL Query to Join --> toRetractStream --> keyed time
>>> window w/ process func & custom trigger --> some other ops
>>> CSV 3 -->
>>>
>>>
>>> When I remove the windowing directly after the `toRetractStream`,
>>> the records make it to the "some other ops" stage, but with the 
>>> windowing,
>>> those operations are sometimes not sent any data. I can also get data 
>>> sent
>>> to the downstream operators by putting in a no-op map before the window 
>>> and
>>> placing some breakpoints in there to manually slow down processing.
>>>
>>>
>>> The logs don't seem to indicate anything went wrong and generally
>>> look like:
>>>
>>> 4819 [Source: Custom File source (1/1)] INFO
>>>  org.apache.flink.runtime.taskmanager.Task  - Source: Custom File source
>>> (1/1) (3578629787c777320d9ab030c004abd4) switched from RUNNING to
>>> FINISHED.\4819 [Source: Custom File source (1/1)] INFO
>>>  org.apache.flink.runtime.taskmanager.Task  - Freeing task resources for
>>> Source: Custom File source (1/1) (3578629787c777320d9ab030c004abd4).
>>> 4819 [Source: Custom File source (1/1)] INFO
>>>  org.apache.flink.runtime.taskmanager.Task  - Ensuring all FileSystem
>>> streams are closed for task Source: Custom File source (1/1)
>>> 

Re: Scala: Static methods in interface require -target:jvm-1.8

2020-10-02 Thread Arvid Heise
Also you could check if Java11 profile in Maven was (de)activated for some
reason.

On Mon, Sep 28, 2020 at 3:29 PM Piotr Nowojski  wrote:

> Hi,
>
> It sounds more like an Intellij issue, not a Flink issue. But have you
> checked your configured target language level for your modules?
>
> Best regards,
> Piotrek
>
> pon., 28 wrz 2020 o 10:57 Lu Weizheng 
> napisał(a):
>
>> Hi all,
>>
>> I recently upgraded Intellij IEDA from 2019 to 2020.2 Community Edition.
>> I didn’t do anything to Maven.
>> My code could compile correctly before. But now I get the following error:
>>
>> Static methods in interface require -target:jvm-1.8
>>
>> Probably because I use new WatermarkStrategy Scala API:
>>
>> .assignTimestampsAndWatermarks(
>>   WatermarkStrategy
>> .forBoundedOutOfOrderness(Duration.ofSeconds(1))
>> .withTimestampAssigner(new SerializableTimestampAssigner[(String,
>> Long, Int)] {
>>   override def extractTimestamp(t: (String, Long,
>> Int), l: Long): Long = t._2
>> })
>> )
>>
>> My project have both java and scala code. Here’s my POM.xml file:
>>
>> 
>>  
>> net.alchim31.maven
>> scala-maven-plugin
>> 
>> 
>>
>>   scala-compile-first
>>   process-resources
>>   
>>  compile
>>   
>>
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>>  
>>
>> Is there something I need to add to my POM file?
>>
>>
>>
>>
>> Best Regards,
>> Weizheng Lu
>>
>

-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: SocketException: Too many open files

2020-10-02 Thread Arvid Heise
Hi Sateesh,

my suspicion would be that your custom Sink Function is leaking connections
(which also count for the file limit). Is there a reason that you cannot
use the ES connector of Flink?

I might have more ideas when you share your sink function.

Best,

Arvid

On Sun, Sep 27, 2020 at 7:16 PM mars  wrote:

> Hi,
>
>  I am using 1.10.0 version of Flink on EMR.
>
>  I am not using the Default Flink Sink. I have a Sink Function on the
> Stream
> and with in the invoke function i am creating a Data Structure (VO) and
> putting it in the Map.
>
>  The EMR Step function i am running is. a Spring based FLink Job and i have
> a scheduler which runs every min and looks for items in the Map and
> generates JSON based in the VO from the Map and send it to Elastic Search
> and removes it from the HashMap once it is sent to ES successfully.
>
>  I am using M5.2x large for worker nodes and M5.4xlarge for Master Node
>
>  I have set the ulimit to 500K for all users (*) . Both soft and hard limit
> on Master and worker nodes.
>
>  Thanks again for your response.
>
> Sateesh
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Checkpoint dir is not cleaned up after cancel the job with monitoring API

2020-10-02 Thread Chesnay Schepler

Yes, the patch call only triggers the cancellation.
You can check whether it is complete by polling the job status via 
jobs/ and checking whether state is CANCELED.


On 9/27/2020 7:02 PM, Eleanore Jin wrote:
I have noticed this: if I have Thread.sleep(1500); after the patch 
call returned 202, then the directory gets cleaned up, in the 
meanwhile, it shows the job-manager pod is in completed state before 
getting terminated: see screenshot: https://ibb.co/3F8HsvG


So the patch call is async to terminate the job? Is there a way to 
check if cancel is completed? So that the stop tm and jm can be called 
afterwards?


Thanks a lot!
Eleanore


On Sun, Sep 27, 2020 at 9:37 AM Eleanore Jin > wrote:


Hi Congxian,
I am making rest call to get the checkpoint config: curl -X GET \

http://localhost:8081/jobs/d2c91a44f23efa2b6a0a89b9f1ca5a3d/checkpoints/config


and here is the response:
{
    "mode": "at_least_once",
    "interval": 3000,
    "timeout": 1,
    "min_pause": 1000,
    "max_concurrent": 1,
    "externalization": {
        "enabled": false,
        "delete_on_cancellation": true
    },
    "state_backend": "FsStateBackend"
}

I uploaded a screenshot of how azure blob storage looks like after
the cancel call : https://ibb.co/vY64pMZ

Thanks a lot!
Eleanore

On Sat, Sep 26, 2020 at 11:23 PM Congxian Qiu
mailto:qcx978132...@gmail.com>> wrote:

Hi Eleanore
    What the `CheckpointRetentionPolicy`[1] did you set for
your job? if
`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION` is set,
then the checkpoint will be kept when canceling a job.

PS the image did not show

[1]

https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#retained-checkpoints
Best,
Congxian


Eleanore Jin mailto:eleanore@gmail.com>> 于2020年9月27日周日
下午1:50写道:

Hi experts,

I am running flink 1.10.2 on kubernetes as per job
cluster. Checkpoint is enabled, with interval 3s,
minimumPause 1s, timeout 10s. I'm using FsStateBackend,
snapshots are persisted to azure blob storage (Microsoft
cloud storage service).

Checkpointed state is just source kafka topic offsets, the
flink job is stateless as it does filter/json transformation.

The way I am trying to stop the flink job is via
monitoring rest api mentioned in doc



e.g.
curl -X PATCH \
 

'http://localhost:8081/jobs/3c00535c182a3a00258e2f57bc11fb1a?mode=cancel'
\
  -H 'Content-Type: application/json' \
  -d '{}'

This call returned successfully with statusCode 202, then
I stopped the task manager pods and job manager pod.

According to the doc, the checkpoint should be cleaned up
after the job is stopped/cancelled.
What I have observed is, the checkpoint dir is not cleaned
up, can you please shield some lights on what I did wrong?

Below shows the checkpoint dir for a cancelled flink job.
image.png

Thanks!
Eleanore





Re: Checkpoint dir is not cleaned up after cancel the job with monitoring API

2020-10-02 Thread Chesnay Schepler

Yes, the patch call only triggers the cancellation.
You can check whether it is complete by polling the job status via 
jobs/ and checking whether state is CANCELED.


On 9/27/2020 7:02 PM, Eleanore Jin wrote:
I have noticed this: if I have Thread.sleep(1500); after the patch 
call returned 202, then the directory gets cleaned up, in the 
meanwhile, it shows the job-manager pod is in completed state before 
getting terminated: see screenshot: https://ibb.co/3F8HsvG


So the patch call is async to terminate the job? Is there a way to 
check if cancel is completed? So that the stop tm and jm can be called 
afterwards?


Thanks a lot!
Eleanore


On Sun, Sep 27, 2020 at 9:37 AM Eleanore Jin > wrote:


Hi Congxian,
I am making rest call to get the checkpoint config: curl -X GET \

http://localhost:8081/jobs/d2c91a44f23efa2b6a0a89b9f1ca5a3d/checkpoints/config


and here is the response:
{
    "mode": "at_least_once",
    "interval": 3000,
    "timeout": 1,
    "min_pause": 1000,
    "max_concurrent": 1,
    "externalization": {
        "enabled": false,
        "delete_on_cancellation": true
    },
    "state_backend": "FsStateBackend"
}

I uploaded a screenshot of how azure blob storage looks like after
the cancel call : https://ibb.co/vY64pMZ

Thanks a lot!
Eleanore

On Sat, Sep 26, 2020 at 11:23 PM Congxian Qiu
mailto:qcx978132...@gmail.com>> wrote:

Hi Eleanore
    What the `CheckpointRetentionPolicy`[1] did you set for
your job? if
`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION` is set,
then the checkpoint will be kept when canceling a job.

PS the image did not show

[1]

https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#retained-checkpoints
Best,
Congxian


Eleanore Jin mailto:eleanore@gmail.com>> 于2020年9月27日周日
下午1:50写道:

Hi experts,

I am running flink 1.10.2 on kubernetes as per job
cluster. Checkpoint is enabled, with interval 3s,
minimumPause 1s, timeout 10s. I'm using FsStateBackend,
snapshots are persisted to azure blob storage (Microsoft
cloud storage service).

Checkpointed state is just source kafka topic offsets, the
flink job is stateless as it does filter/json transformation.

The way I am trying to stop the flink job is via
monitoring rest api mentioned in doc



e.g.
curl -X PATCH \
 

'http://localhost:8081/jobs/3c00535c182a3a00258e2f57bc11fb1a?mode=cancel'
\
  -H 'Content-Type: application/json' \
  -d '{}'

This call returned successfully with statusCode 202, then
I stopped the task manager pods and job manager pod.

According to the doc, the checkpoint should be cleaned up
after the job is stopped/cancelled.
What I have observed is, the checkpoint dir is not cleaned
up, can you please shield some lights on what I did wrong?

Below shows the checkpoint dir for a cancelled flink job.
image.png

Thanks!
Eleanore





Re: Flink 1.12 snapshot throws ClassNotFoundException

2020-10-02 Thread Till Rohrmann
Great to hear that it works now :-)

On Fri, Oct 2, 2020 at 2:17 AM Lian Jiang  wrote:

> Thanks Till.  Making the scala version consistent using 2.11 solved the
> ClassNotFoundException.
>
> On Tue, Sep 29, 2020 at 11:58 PM Till Rohrmann 
> wrote:
>
>> Hi Lian,
>>
>> I suspect that it is caused by an incompatible Akka version. Flink uses
>> Akka 2.5.21 instead of 2.5.12. Moreover, you are mixing Flink jars which
>> use Scala 2.11 with Akka dependencies which are built against Scala 2.12.
>>
>> I am not an Gradle expert but can't Gradle simply pull in the transitive
>> dependencies of flink-runtime?
>>
>> Cheers,
>> Till
>>
>> On Wed, Sep 30, 2020 at 2:22 AM Lian Jiang  wrote:
>>
>>> Hi,
>>>
>>> I use Flink source master to build a snapshot and use the jars in my
>>> project. The goal is to avoid hacky deserialization code caused by avro 1.8
>>> in old Flink versions since Flink 1.12 uses avro 1.10. Unfortunately, the
>>> code throws below ClassNotFoundException. I have verified that the
>>> akka-actor jar 2.5.12 is available and specified in -classpath. I can even
>>> create an object using akka/serialization/NullSerializer class in my
>>> application, indicating there is no problem for this app to use any class
>>> under namespace akka/serialization.
>>>
>>> Caused by: java.lang.NoClassDefFoundError:
>>> akka/serialization/BaseSerializer$class
>>> at
>>> akka.remote.serialization.MiscMessageSerializer.(MiscMessageSerializer.scala:25)
>>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>>> at
>>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>>> at
>>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>>> at
>>> akka.actor.ReflectiveDynamicAccess.$anonfun$createInstanceFor$1(ReflectiveDynamicAccess.scala:33)
>>> at scala.util.Try$.apply(Try.scala:213)
>>> at
>>> akka.actor.ReflectiveDynamicAccess.createInstanceFor(ReflectiveDynamicAccess.scala:28)
>>> at
>>> akka.actor.ReflectiveDynamicAccess.$anonfun$createInstanceFor$4(ReflectiveDynamicAccess.scala:39)
>>> at scala.util.Success.flatMap(Try.scala:251)
>>> at
>>> akka.actor.ReflectiveDynamicAccess.createInstanceFor(ReflectiveDynamicAccess.scala:39)
>>> at akka.serialization.Serialization.serializerOf(Serialization.scala:320)
>>> at
>>> akka.serialization.Serialization.$anonfun$serializers$2(Serialization.scala:346)
>>> at
>>> scala.collection.TraversableLike$WithFilter.$anonfun$map$2(TraversableLike.scala:874)
>>> at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:394)
>>> at
>>> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:721)
>>> at
>>> scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:873)
>>> at akka.serialization.Serialization.(Serialization.scala:346)
>>> at
>>> akka.serialization.SerializationExtension$.createExtension(SerializationExtension.scala:16)
>>> at
>>> akka.serialization.SerializationExtension$.createExtension(SerializationExtension.scala:13)
>>> at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:913)
>>> at
>>> akka.actor.ActorSystemImpl.$anonfun$loadExtensions$1(ActorSystem.scala:946)
>>> at scala.collection.Iterator.foreach(Iterator.scala:943)
>>> at scala.collection.Iterator.foreach$(Iterator.scala:943)
>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>>> at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>>> at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>>> at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>>> at akka.actor.ActorSystemImpl.loadExtensions$1(ActorSystem.scala:944)
>>> at akka.actor.ActorSystemImpl.loadExtensions(ActorSystem.scala:961)
>>> at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:833)
>>> at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:823)
>>> at akka.actor.ActorSystemImpl._start(ActorSystem.scala:823)
>>> at akka.actor.ActorSystemImpl.start(ActorSystem.scala:842)
>>> at
>>> akka.actor.RobustActorSystem$.internalApply(RobustActorSystem.scala:96)
>>> at akka.actor.RobustActorSystem$.apply(RobustActorSystem.scala:70)
>>> at akka.actor.RobustActorSystem$.create(RobustActorSystem.scala:55)
>>> at
>>> org.apache.flink.runtime.akka.AkkaUtils$.createActorSystem(AkkaUtils.scala:125)
>>> at
>>> org.apache.flink.runtime.akka.AkkaUtils.createActorSystem(AkkaUtils.scala)
>>> at
>>> org.apache.flink.runtime.clusterframework.BootstrapTools.startActorSystem(BootstrapTools.java:276)
>>> at
>>> org.apache.flink.runtime.clusterframework.BootstrapTools.startLocalActorSystem(BootstrapTools.java:260)
>>> ... 11 more
>>>
>>>
>>> This is my gradle:
>>>
>>> implementation files('lib/flink-avro-confluent-registry-1.12-SNAPSHOT.jar')
>>> implementation files('lib/flink-clients_2.11-1.12-SNAPSHOT.jar')
>>> implementation 

Re: Unknown datum type java.time.Instant: 2020-09-15T07:00:00Z

2020-10-02 Thread Arvid Heise
Hi Lian,

sorry for the late reply.

1. All serialization related functions are just implementation of API
interfaces. As such, you can implement serializers yourself. In this case,
you could simply copy the code from 1.12 into your application. You may
adjust a few things that are different between 1.11 and 1.12 though.
2. enableObjectReuse avoids copying of records between chained operators.
The chain ends with any keyby.
A possible workaround is to enableObjectReuse and then convert the datum
with logical types into a datum without logical types if possible
(potentially dropping a few fields to reduce network traffic along the way).

On Wed, Sep 23, 2020 at 6:11 PM Lian Jiang  wrote:

> Dawid,
>
> Thanks for the fix. I may wait for Flink 1.12 coming out at the end of Oct
> this year. Meanwhile, I may want to better understand the current solution
> at the beginning of this thread.
>
> My observations:
>
> 1. ProcessFunction with streamEnv.getConfig().enableObjectReuse() -->
> working
>
> 2. ProcessFunction without streamEnv.getConfig().enableObjectReuse() -->
> Not working
>
> Caused by: org.apache.avro.AvroRuntimeException: Unknown datum type
> java.time.Instant: 2020-09-21T18:54:06.216Z
> at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:912)
> at
> org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:413)
> at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:874)
> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1293)
> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1224)
> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1286)
> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1224)
> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1293)
> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1224)
> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1286)
> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1224)
> at
> org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:243)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
> at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
>
>
>
> 3. KeyedProcessFunction with streamEnv.getConfig().enableObjectReuse() -->
> Not working
>
> Caused by: org.apache.avro.AvroRuntimeException: Unknown datum type 
> java.time.Instant: 2020-09-21T19:52:58.477Z
>   at 
> org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:912)
>   at 
> org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:413)
>   at 
> org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:874)
>   at 
> org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:272)
>   at 
> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:143)
>   at 
> org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:98)
>   at 
> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:210)
>   at 
> org.apache.avro.specific.SpecificDatumWriter.writeRecord(SpecificDatumWriter.java:83)
>   at 
> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131)
>   at 
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:83)
>   at 
> 

Re: Experiencing different watermarking behaviour in local/standalone modes when reading from kafka topic with idle partitions

2020-10-02 Thread Salva Alcántara
Awesome David, thanks for clarifying!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/