[jira] [Commented] (FLINK-33681) Display source/sink numRecordsIn/Out & numBytesIn/Out on UI

2024-05-28 Thread Jiangjie Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17850176#comment-17850176
 ] 

Jiangjie Qin commented on FLINK-33681:
--

[~Zhanghao Chen] Thanks for firing the ticket. The solution sounds good to me. 
Are you interested in authoring the patch? I am happy to review.

BTW, technically speaking this is a user sensible behavior change. So a FLIP 
would be required. It is probably going to be a quick FLIP, though.

> Display source/sink numRecordsIn/Out & numBytesIn/Out on UI
> ---
>
> Key: FLINK-33681
> URL: https://issues.apache.org/jira/browse/FLINK-33681
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Metrics
>Affects Versions: 1.18.0, 1.17.2
>Reporter: Zhanghao Chen
>Priority: Major
>  Labels: pull-request-available
> Attachments: screenshot-20231224-195605-1.png, 
> screenshot-20231225-120421.png
>
>
> Currently, the numRecordsIn & numBytesIn metrics for sources and the 
> numRecordsOut & numBytesOut metrics for sinks are always 0 on the Flink web 
> dashboard.
> FLINK-11576 brings us these metrics on the opeartor level, but it does not 
> integrate them on the task level. On the other hand, the summay metrics on 
> the job overview page is based on the task level I/O metrics. As a result, 
> even though new connectors supporting FLIP-33 metrics will report 
> operator-level I/O metrics, we still cannot see the metrics on dashboard.
> This ticket serves as an umbrella issue to integrate standard source/sink I/O 
> metrics with the corresponding task I/O metrics. 
> !screenshot-20231224-195605-1.png|width=608,height=333!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-33733) [FLIP-321] Update the stability docs to add the migration periods.

2023-12-15 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin resolved FLINK-33733.
--
Fix Version/s: 1.19.0
   Resolution: Fixed

> [FLIP-321] Update the stability docs to add the migration periods.
> --
>
> Key: FLINK-33733
> URL: https://issues.apache.org/jira/browse/FLINK-33733
> Project: Flink
>  Issue Type: New Feature
>  Components: Documentation
>Affects Versions: 1.19.0
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>Priority: Major
> Fix For: 1.19.0
>
>
> This ticket updates the docs to add the migration period for deprecated APIs, 
> which are specified in 
> [FLIP-321.|https://cwiki.apache.org/confluence/display/FLINK/FLIP-321%3A+Introduce+an+API+deprecation+process]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33733) [FLIP-321] Update the stability docs to add the migration periods.

2023-12-15 Thread Jiangjie Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33733?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17797081#comment-17797081
 ] 

Jiangjie Qin commented on FLINK-33733:
--

Just realized that I forgot to make the PR title start with FLINK-33733, 
causing the PR link missing from the Jira ticket...

The PR link and git commit hash are following:
PR link: [https://github.com/apache/flink/pull/23865]
Patch merged to master: d4a3687aacdea61920098dd7814776655fde19db

> [FLIP-321] Update the stability docs to add the migration periods.
> --
>
> Key: FLINK-33733
> URL: https://issues.apache.org/jira/browse/FLINK-33733
> Project: Flink
>  Issue Type: New Feature
>  Components: Documentation
>Affects Versions: 1.19.0
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>Priority: Major
>
> This ticket updates the docs to add the migration period for deprecated APIs, 
> which are specified in 
> [FLIP-321.|https://cwiki.apache.org/confluence/display/FLINK/FLIP-321%3A+Introduce+an+API+deprecation+process]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-33733) [FLIP-321] Update the stability docs to add the migration periods.

2023-12-03 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin reassigned FLINK-33733:


Assignee: Jiangjie Qin

> [FLIP-321] Update the stability docs to add the migration periods.
> --
>
> Key: FLINK-33733
> URL: https://issues.apache.org/jira/browse/FLINK-33733
> Project: Flink
>  Issue Type: New Feature
>  Components: Documentation
>Affects Versions: 1.19.0
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>Priority: Major
>
> This ticket updates the docs to add the migration period for deprecated APIs, 
> which are specified in 
> [FLIP-321.|https://cwiki.apache.org/confluence/display/FLINK/FLIP-321%3A+Introduce+an+API+deprecation+process]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33733) [FLIP-321] Update the stability docs to add the migration periods.

2023-12-03 Thread Jiangjie Qin (Jira)
Jiangjie Qin created FLINK-33733:


 Summary: [FLIP-321] Update the stability docs to add the migration 
periods.
 Key: FLINK-33733
 URL: https://issues.apache.org/jira/browse/FLINK-33733
 Project: Flink
  Issue Type: New Feature
  Components: Documentation
Affects Versions: 1.19.0
Reporter: Jiangjie Qin


This ticket updates the docs to add the migration period for deprecated APIs, 
which are specified in 
[FLIP-321.|https://cwiki.apache.org/confluence/display/FLINK/FLIP-321%3A+Introduce+an+API+deprecation+process]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-20767) [FLIP-356] Add nested field support for SupportsFilterPushDown

2023-09-26 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20767?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin resolved FLINK-20767.
--
Fix Version/s: 1.19.0
   Resolution: Implemented

PR is merged to master: 5be4688e8b8a055b9e8891c265f6f354083d630f

> [FLIP-356] Add nested field support for SupportsFilterPushDown
> --
>
> Key: FLINK-20767
> URL: https://issues.apache.org/jira/browse/FLINK-20767
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Jun Zhang
>Assignee: Venkata krishnan Sowrirajan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> I think we should add the nested field support for SupportsFilterPushDown



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32398) Support Avro SpecificRecord in DataStream and Table conversion.

2023-06-20 Thread Jiangjie Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17735328#comment-17735328
 ] 

Jiangjie Qin commented on FLINK-32398:
--

[~twalthr] [~jark] If this already works and I missed something, please let me 
know.

Orthogonal to this ticket, the {{AvroSchemaConverter }}class is not marked as 
Public, but from the commit log it seems this should be a public API. Same 
issue is there for the other classes in the flink-avro package. Actually non of 
the classes / interfaces is marked as Public in the entire flink-avro package.

> Support Avro SpecificRecord in DataStream and Table conversion.
> ---
>
> Key: FLINK-32398
> URL: https://issues.apache.org/jira/browse/FLINK-32398
> Project: Flink
>  Issue Type: New Feature
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.17.1
>Reporter: Jiangjie Qin
>Priority: Major
>
> At this point, it seems that Avro SpecificRecord is not supported in 
> DataStream and Table conversion. For example, the following code breaks when 
> MyAvroRecord contains fields of type Record, Enum, Array, etc.
>  
> {code:java}
> ing schemaString = MyAvroRecord.getClassSchema().toString();
> DataType dataType = AvroSchemaConverter.convertToDataType(schemaString);
> TypeInformation typeInfo = 
> AvroSchemaConverter.convertToTypeInfo(schemaString);;
> input.getTransformation().setOutputType(typeInfo);
> tEnv.createTemporaryView("myTable", input);
> Table result = tEnv.sqlQuery("SELECT * FROM myTable");
> DataStream output = tEnv.toDataStream(result, dataType);
> output.getTransformation().setOutputType(typeInfo); {code}
>  
> While the conversion from {{MyAvroRecord}} to {{RowData}} seems fine, several 
> issues were there when converting the {{RowData}} back to 
> {{{}MyAvroRecord{}}}, including but not limited to:
>  # {{AvroSchemaConverter.convertToDataType(schema)}} maps Avro Record type to 
> RowType, which loses the class information.
>  # {{AvroSchemaConverter}} maps Enum to StringType, and simply try to cast 
> the string to the Enum.
> I did not find a way to easily convert the between DataStream and Table for 
> Avro SpecificRecord. Given the popularity of Avro SpecificRecord, we should 
> support this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32398) Support Avro SpecificRecord in DataStream and Table conversion.

2023-06-20 Thread Jiangjie Qin (Jira)
Jiangjie Qin created FLINK-32398:


 Summary: Support Avro SpecificRecord in DataStream and Table 
conversion.
 Key: FLINK-32398
 URL: https://issues.apache.org/jira/browse/FLINK-32398
 Project: Flink
  Issue Type: New Feature
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Affects Versions: 1.17.1
Reporter: Jiangjie Qin


At this point, it seems that Avro SpecificRecord is not supported in DataStream 
and Table conversion. For example, the following code breaks when MyAvroRecord 
contains fields of type Record, Enum, Array, etc.

 
{code:java}
ing schemaString = MyAvroRecord.getClassSchema().toString();
DataType dataType = AvroSchemaConverter.convertToDataType(schemaString);
TypeInformation typeInfo = 
AvroSchemaConverter.convertToTypeInfo(schemaString);;

input.getTransformation().setOutputType(typeInfo);
tEnv.createTemporaryView("myTable", input);
Table result = tEnv.sqlQuery("SELECT * FROM myTable");
DataStream output = tEnv.toDataStream(result, dataType);
output.getTransformation().setOutputType(typeInfo); {code}
 

While the conversion from {{MyAvroRecord}} to {{RowData}} seems fine, several 
issues were there when converting the {{RowData}} back to {{{}MyAvroRecord{}}}, 
including but not limited to:
 # {{AvroSchemaConverter.convertToDataType(schema)}} maps Avro Record type to 
RowType, which loses the class information.
 # {{AvroSchemaConverter}} maps Enum to StringType, and simply try to cast the 
string to the Enum.

I did not find a way to easily convert the between DataStream and Table for 
Avro SpecificRecord. Given the popularity of Avro SpecificRecord, we should 
support this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31654) DataStreamUtils.reinterpretAsKeyedStream() should not override the user specified chaining strategy.

2023-03-29 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31654?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated FLINK-31654:
-
Description: 
Currently {{DataStreamUtils.reinterpretAsKeyedStream()}} does not work well 
with batch jobs. Currently the chaining strategy of the StreamOperators applied 
to a KeyedStream is always overridden to HEAD. This is because in batch 
execution mode the records have to be sorted by keys before they are fed to the 
stateful operators. The runtime relies on the shuffle to do the sort so a 
shuffle is needed for the stateful operators.

However, for {{DataStreamUtils.reinterpretAsKeyedStream()}} this results in 
unexpected behavior. It breaks the operator chain and defeats the purpose of 
reinterpreting the stream instead of calling {{keyBy.}}

To fix this issue, we need to do the following for reinterpretAsKeyedStream:
 # Add a sort operator instead of relying on the shuffle to do the sort.
 # Stop overriding the chaining strategy specified by the user for the 
operators applied to the result KeyedStream.

  was:
Currently {{DataStreamUtils.reinterpretAsKeyedStream()}} does not work well 
with batch jobs. There are two issues:
 # The input to the downstream operator will not be sorted. So users need to 
sort the records by themselves.
 # The result {{KeyedStream}} will still override the chaining strategy of the 
downstream operator to {{HEAD.}} This breaks the operator chain and defeats the 
purpose of reinterpreting the stream instead of calling {{keyBy.}}

This ticket tends to address the second issue.


> DataStreamUtils.reinterpretAsKeyedStream() should not override the user 
> specified chaining strategy.
> 
>
> Key: FLINK-31654
> URL: https://issues.apache.org/jira/browse/FLINK-31654
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.17.0, 1.14.6, 1.16.1, 1.15.4
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>Priority: Major
> Fix For: 1.16.2, 1.18.0, 1.17.1
>
>
> Currently {{DataStreamUtils.reinterpretAsKeyedStream()}} does not work well 
> with batch jobs. Currently the chaining strategy of the StreamOperators 
> applied to a KeyedStream is always overridden to HEAD. This is because in 
> batch execution mode the records have to be sorted by keys before they are 
> fed to the stateful operators. The runtime relies on the shuffle to do the 
> sort so a shuffle is needed for the stateful operators.
> However, for {{DataStreamUtils.reinterpretAsKeyedStream()}} this results in 
> unexpected behavior. It breaks the operator chain and defeats the purpose of 
> reinterpreting the stream instead of calling {{keyBy.}}
> To fix this issue, we need to do the following for reinterpretAsKeyedStream:
>  # Add a sort operator instead of relying on the shuffle to do the sort.
>  # Stop overriding the chaining strategy specified by the user for the 
> operators applied to the result KeyedStream.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31654) DataStreamUtils.reinterpretAsKeyedStream() should not override the user specified chaining strategy.

2023-03-29 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31654?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated FLINK-31654:
-
Fix Version/s: 1.16.2

> DataStreamUtils.reinterpretAsKeyedStream() should not override the user 
> specified chaining strategy.
> 
>
> Key: FLINK-31654
> URL: https://issues.apache.org/jira/browse/FLINK-31654
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.17.0, 1.14.6, 1.16.1, 1.15.4
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>Priority: Major
> Fix For: 1.16.2, 1.18.0, 1.17.1
>
>
> Currently {{DataStreamUtils.reinterpretAsKeyedStream()}} does not work well 
> with batch jobs. There are two issues:
>  # The input to the downstream operator will not be sorted. So users need to 
> sort the records by themselves.
>  # The result {{KeyedStream}} will still override the chaining strategy of 
> the downstream operator to {{HEAD.}} This breaks the operator chain and 
> defeats the purpose of reinterpreting the stream instead of calling {{keyBy.}}
> This ticket tends to address the second issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31654) DataStreamUtils.reinterpretAsKeyedStream() should not override the user specified chaining strategy.

2023-03-29 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31654?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated FLINK-31654:
-
Fix Version/s: 1.18.0
   1.17.1

> DataStreamUtils.reinterpretAsKeyedStream() should not override the user 
> specified chaining strategy.
> 
>
> Key: FLINK-31654
> URL: https://issues.apache.org/jira/browse/FLINK-31654
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.17.0, 1.14.6, 1.16.1, 1.15.4
>Reporter: Jiangjie Qin
>Priority: Major
> Fix For: 1.18.0, 1.17.1
>
>
> Currently {{DataStreamUtils.reinterpretAsKeyedStream()}} does not work well 
> with batch jobs. There are two issues:
>  # The input to the downstream operator will not be sorted. So users need to 
> sort the records by themselves.
>  # The result {{KeyedStream}} will still override the chaining strategy of 
> the downstream operator to {{HEAD.}} This breaks the operator chain and 
> defeats the purpose of reinterpreting the stream instead of calling {{keyBy.}}
> This ticket tends to address the second issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-31654) DataStreamUtils.reinterpretAsKeyedStream() should not override the user specified chaining strategy.

2023-03-29 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31654?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin reassigned FLINK-31654:


Assignee: Jiangjie Qin

> DataStreamUtils.reinterpretAsKeyedStream() should not override the user 
> specified chaining strategy.
> 
>
> Key: FLINK-31654
> URL: https://issues.apache.org/jira/browse/FLINK-31654
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.17.0, 1.14.6, 1.16.1, 1.15.4
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>Priority: Major
> Fix For: 1.18.0, 1.17.1
>
>
> Currently {{DataStreamUtils.reinterpretAsKeyedStream()}} does not work well 
> with batch jobs. There are two issues:
>  # The input to the downstream operator will not be sorted. So users need to 
> sort the records by themselves.
>  # The result {{KeyedStream}} will still override the chaining strategy of 
> the downstream operator to {{HEAD.}} This breaks the operator chain and 
> defeats the purpose of reinterpreting the stream instead of calling {{keyBy.}}
> This ticket tends to address the second issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31654) DataStreamUtils.reinterpretAsKeyedStream() should not override the user specified chaining strategy.

2023-03-29 Thread Jiangjie Qin (Jira)
Jiangjie Qin created FLINK-31654:


 Summary: DataStreamUtils.reinterpretAsKeyedStream() should not 
override the user specified chaining strategy.
 Key: FLINK-31654
 URL: https://issues.apache.org/jira/browse/FLINK-31654
 Project: Flink
  Issue Type: Bug
  Components: API / DataStream
Affects Versions: 1.15.4, 1.16.1, 1.14.6, 1.17.0
Reporter: Jiangjie Qin


Currently {{DataStreamUtils.reinterpretAsKeyedStream()}} does not work well 
with batch jobs. There are two issues:
 # The input to the downstream operator will not be sorted. So users need to 
sort the records by themselves.
 # The result {{KeyedStream}} will still override the chaining strategy of the 
downstream operator to {{HEAD.}} This breaks the operator chain and defeats the 
purpose of reinterpreting the stream instead of calling {{keyBy.}}

This ticket tends to address the second issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31324) Broken SingleThreadFetcherManager constructor API

2023-03-06 Thread Jiangjie Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17696756#comment-17696756
 ] 

Jiangjie Qin commented on FLINK-31324:
--

I think `SingleThreadFetcherManager` is indeed somewhat public at the moment. 
Connector implementations extend this class from time to time. So we probably 
need to make it backwards compatible even though it is marked as internal.

It also looks OK if we make it PublicEvolving. If we do so, the only additional 
class that we also need to make public is `FutureCompletingBlockingQueue` as 
PublicEvolving. So it does not pull in much unnecessary class exposures to the 
users.

> Broken SingleThreadFetcherManager constructor API
> -
>
> Key: FLINK-31324
> URL: https://issues.apache.org/jira/browse/FLINK-31324
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Parent
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> FLINK-28853 changed the default constructor of 
> {{SingleThreadFetcherManager}}. Though the {{SingleThreadFetcherManager}} is 
> annotated as {{Internal}}, it actually acts as some-degree public API, which 
> is widely used in many connector projects:
> [flink-cdc-connector|https://github.com/ververica/flink-cdc-connectors/blob/release-2.3.0/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReader.java#L93],
>  
> [flink-connector-mongodb|https://github.com/apache/flink-connector-mongodb/blob/main/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/reader/MongoSourceReader.java#L58]
>  and so on.
> Once flink-1.17 is released, all these existing connectors are broken and 
> cannot be used in new release version, and will throw exceptions like:
> {code:java}
> java.lang.NoSuchMethodError: 
> org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager.(Lorg/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue;Ljava/util/function/Supplier;)V
>   at 
> com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader.(MySqlSourceReader.java:91)
>  ~[flink-sql-connector-mysql-cdc-2.3.0.jar:2.3.0]
>   at 
> com.ververica.cdc.connectors.mysql.source.MySqlSource.createReader(MySqlSource.java:159)
>  ~[flink-sql-connector-mysql-cdc-2.3.0.jar:2.3.0]
>   at 
> org.apache.flink.streaming.api.operators.SourceOperator.initReader(SourceOperator.java:312)
>  ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.init(SourceOperatorStreamTask.java:94)
>  ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:699)
>  ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675)
>  ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
>  ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921) 
> ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) 
> ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) 
> ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
>   at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_362]
> {code}
> Thus, I suggest to make the original SingleThreadFetcherManager constructor 
> as depreacted instead of removing it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30935) Add KafkaSerializer deserialize check when using SimpleVersionedSerializer

2023-02-09 Thread Jiangjie Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17686336#comment-17686336
 ] 

Jiangjie Qin commented on FLINK-30935:
--

Sounds good to me.

> Add KafkaSerializer deserialize check when using SimpleVersionedSerializer
> --
>
> Key: FLINK-30935
> URL: https://issues.apache.org/jira/browse/FLINK-30935
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Reporter: Ran Tao
>Priority: Major
>
> {code:java}
> @Override
> public int getVersion() {
> return CURRENT_VERSION;
> }
> @Override
> public KafkaPartitionSplit deserialize(int version, byte[] serialized) throws 
> IOException {
> try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
> DataInputStream in = new DataInputStream(bais)) {
> String topic = in.readUTF();
> int partition = in.readInt();
> long offset = in.readLong();
> long stoppingOffset = in.readLong();
> return new KafkaPartitionSplit(
> new TopicPartition(topic, partition), offset, stoppingOffset);
> }
> } {code}
> Current kafka many implemented serializers do not deal with version check. I 
> think we can add it like many other connectors's implementation in case of 
> incompatible or corrupt state when restoring from checkpoint.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30645) [FLIP-286] The scope/stability annotation in AbstractStreamOperator are inconsistent.

2023-01-12 Thread Jiangjie Qin (Jira)
Jiangjie Qin created FLINK-30645:


 Summary: [FLIP-286] The scope/stability annotation in 
AbstractStreamOperator are inconsistent.
 Key: FLINK-30645
 URL: https://issues.apache.org/jira/browse/FLINK-30645
 Project: Flink
  Issue Type: Bug
  Components: API / DataStream
Affects Versions: 1.16.0
Reporter: Jiangjie Qin


It looks that currently the {{{_}AbstractStreamOperator}}{_} API has some 
scope/stability annotation inconsistency. More specifically,
 * The {{AbstractStreamOperator}} class is marked as {{_@PublicEvolving_}}
 * {{AbstractStreamOperator.getInternalTimerService()}} returns a type of 
{{InternalTimerService}} __ which is marked as {{@Internal}}
 * {{InternalOperatorMetricGroup}} and {{InternalIOperatorIOMetricGroup}} __ 
are also available to the subclasses of {{AbstractStreamOperator}} but marked 
as {{{}@Internal{}}}.

FLIP-286 proposes to fix the above annotation inconsistency by marking the 
following classes \{{@PublicEvolving}}.
 * org.apache.flink.streaming.api.operators.InternalTimerService
 * org.apache.flink.runtime.metrics.groups.InternalOperatorMetricGroup
 * org.apache.flink.runtime.metrics.groups.InternalOperatorIOMetricGroup

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-30424) Add source operator restore readerState log to distinguish split is from newPartitions or split state

2022-12-16 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30424?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin resolved FLINK-30424.
--
Resolution: Fixed

Merged to master: 528186b62da92ecde0fa308f1df0cc6f95495f4d

> Add source operator restore readerState log to distinguish split is from 
> newPartitions or split state
> -
>
> Key: FLINK-30424
> URL: https://issues.apache.org/jira/browse/FLINK-30424
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Affects Versions: 1.16.0, 1.15.3, 1.16.1
>Reporter: Ran Tao
>Assignee: Ran Tao
>Priority: Major
>  Labels: pull-request-available
>
> When a job start firstly, we can find 'assignPartitions' from log。but if 
> source recover from state, we can not distinguish the newPartitions is from 
> timed discover thread or from reader task state.  
> We can add a helper log to distinguish and confirm the reader using split 
> state in recover situation.  it's very useful for troubleshooting.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30424) Add source operator restore readerState log to distinguish split is from newPartitions or split state

2022-12-16 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30424?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated FLINK-30424:
-
Fix Version/s: 1.17.0

> Add source operator restore readerState log to distinguish split is from 
> newPartitions or split state
> -
>
> Key: FLINK-30424
> URL: https://issues.apache.org/jira/browse/FLINK-30424
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Affects Versions: 1.16.0, 1.15.3, 1.16.1
>Reporter: Ran Tao
>Assignee: Ran Tao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> When a job start firstly, we can find 'assignPartitions' from log。but if 
> source recover from state, we can not distinguish the newPartitions is from 
> timed discover thread or from reader task state.  
> We can add a helper log to distinguish and confirm the reader using split 
> state in recover situation.  it's very useful for troubleshooting.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28411) OperatorCoordinator exception may fail Session Cluster

2022-07-05 Thread Jiangjie Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17562909#comment-17562909
 ] 

Jiangjie Qin commented on FLINK-28411:
--

[~martijnvisser] Yes, I think this is a problem. I need to check a bit more on 
the JM initialization logic to see what is the best fix. As Stephan mentioned 
in the other ticket, putting the initialization in the enumerator thread could 
be an option. It postpones the exception handling to after the JM 
initialization finishes, at which point the JM will be able to handle the per 
job global failure correctly.

> OperatorCoordinator exception may fail Session Cluster
> --
>
> Key: FLINK-28411
> URL: https://issues.apache.org/jira/browse/FLINK-28411
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Reporter: Daren Wong
>Priority: Major
> Fix For: 1.15.2
>
>
> Part of Scheduler's startScheduling procedure involves starting all 
> OperatorCoordinatorHolder, and when one of the OperatorCoordinator fails to 
> start, the exception is forwarded up the stack triggering a JobMaster 
> failover. However, JobMaster failover only works if HA is enabled[1]. If HA 
> is not enabled the fatal error handler will simply exit the JM process 
> killing the entire cluster. This is problematic in the case of a session 
> cluster where there may be multiple jobs running. It also does not play well 
> with external tooling that does not expect job failure to cause a full 
> cluster failure. 
>  
> It would be preferable if failure to start an OperatorCoordinator did not 
> take down the entire cluster, but instead failed that particular job. 
>  
> This issue is similar to https://issues.apache.org/jira/browse/FLINK-24303 
> which fix this issue for a SourceCoordinator specifically.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27554) The asf-site does not build on Apple Silicon

2022-05-14 Thread Jiangjie Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27554?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17536975#comment-17536975
 ] 

Jiangjie Qin commented on FLINK-27554:
--

Thanks for digging into this, [~xtsong]. I think this is a good progress. I am 
wondering if [~vthinkxie] would have some idea about how to make this work.

> The asf-site does not build on Apple Silicon
> 
>
> Key: FLINK-27554
> URL: https://issues.apache.org/jira/browse/FLINK-27554
> Project: Flink
>  Issue Type: Sub-task
>  Components: Project Website
>Affects Versions: 1.15.0
>Reporter: Jiangjie Qin
>Priority: Major
>
> It looks that the ASF website does not build on my laptop with Apple silicon. 
> It errors out when installing libv8 via the following command:
> {noformat}
> gem install libv8 -v '3.16.14.19' --source 'https://rubygems.org/'
> {noformat}
> The error logs are following:
> {noformat}
> current directory: /Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/ext/libv8
> /System/Library/Frameworks/Ruby.framework/Versions/2.6/usr/bin/ruby -I 
> /System/Library/Frameworks/Ruby.framework/Versions/2.6/usr/lib/ruby/2.6.0 -r 
> ./siteconf20220509-16154-19vsxkp.rb extconf.rb
> creating Makefile
> Applying 
> /Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/patches/disable-building-tests.patch
> Applying 
> /Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/patches/disable-werror-on-osx.patch
> Applying 
> /Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/patches/disable-xcode-debugging.patch
> Applying 
> /Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/patches/do-not-imply-vfp3-and-armv7.patch
> Applying 
> /Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/patches/do-not-use-MAP_NORESERVE-on-freebsd.patch
> Applying 
> /Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/patches/do-not-use-vfp2.patch
> Applying 
> /Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/patches/fPIC-for-static.patch
> Compiling v8 for x64
> Using python 2.7.18
> Using compiler: c++ (clang version 13.1.6)
> Unable to find a compiler officially supported by v8.
> It is recommended to use GCC v4.4 or higher
> Beginning compilation. This will take some time.
> Building v8 with env CXX=c++ LINK=c++  /usr/bin/make x64.release vfp2=off 
> vfp3=on hardfp=on ARFLAGS.target=crs werror=no
> GYP_GENERATORS=make \
>  build/gyp/gyp --generator-output="out" build/all.gyp \
>                -Ibuild/standalone.gypi --depth=. \
>                -Dv8_target_arch=x64 \
>                -S.x64  -Dv8_enable_backtrace=1 
> -Dv8_can_use_vfp2_instructions=false -Dv8_can_use_vfp3_instructions=true 
> -Darm_fpu=vfpv3 -Dwerror='' -Dv8_use_arm_eabi_hardfloat=true
>   CXX(target) 
> /Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/vendor/v8/out/x64.release/obj.target/preparser_lib/src/allocation.o
> clang: warning: include path for libstdc++ headers not found; pass 
> '-stdlib=libc++' on the command line to use the libc++ standard library 
> instead [-Wstdlibcxx-not-found]
> In file included from ../src/allocation.cc:33:
> ../src/utils.h:33:10: fatal error: 'climits' file not found
> #include 
>          ^
> 1 error generated.
> make[1]: *** 
> [/Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/vendor/v8/out/x64.release/obj.target/preparser_lib/src/allocation.o]
>  Error 1
> make: *** [x64.release] Error 2
> /Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/ext/libv8/location.rb:36:in 
> `block in verify_installation!': libv8 did not install properly, expected 
> binary v8 archive 
> '/Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/vendor/v8/out/x64.release/obj.target/tools/gyp/libv8_base.a'to
>  exist, but it was not found (Libv8::Location::Vendor::ArchiveNotFound)
>  from 
> /Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/ext/libv8/location.rb:35:in 
> `each'
>  from 
> /Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/ext/libv8/location.rb:35:in 
> `verify_installation!'
>  from 
> /Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/ext/libv8/location.rb:26:in 
> `install!'
>  from extconf.rb:7:in `'
>  
> extconf failed, exit code 1
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (FLINK-27554) The asf-site does not build on Apple Silicon

2022-05-09 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated FLINK-27554:
-
Description: 
It looks that the ASF website does not build on my laptop with Apple silicon. 
It errors out when installing libv8 via the following command:
{noformat}
gem install libv8 -v '3.16.14.19' --source 'https://rubygems.org/'
{noformat}
The error logs are following:
{noformat}
current directory: /Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/ext/libv8

/System/Library/Frameworks/Ruby.framework/Versions/2.6/usr/bin/ruby -I 
/System/Library/Frameworks/Ruby.framework/Versions/2.6/usr/lib/ruby/2.6.0 -r 
./siteconf20220509-16154-19vsxkp.rb extconf.rb

creating Makefile

Applying 
/Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/patches/disable-building-tests.patch

Applying 
/Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/patches/disable-werror-on-osx.patch

Applying 
/Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/patches/disable-xcode-debugging.patch

Applying 
/Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/patches/do-not-imply-vfp3-and-armv7.patch

Applying 
/Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/patches/do-not-use-MAP_NORESERVE-on-freebsd.patch

Applying 
/Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/patches/do-not-use-vfp2.patch

Applying 
/Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/patches/fPIC-for-static.patch

Compiling v8 for x64

Using python 2.7.18

Using compiler: c++ (clang version 13.1.6)

Unable to find a compiler officially supported by v8.

It is recommended to use GCC v4.4 or higher

Beginning compilation. This will take some time.

Building v8 with env CXX=c++ LINK=c++  /usr/bin/make x64.release vfp2=off 
vfp3=on hardfp=on ARFLAGS.target=crs werror=no

GYP_GENERATORS=make \

 build/gyp/gyp --generator-output="out" build/all.gyp \

               -Ibuild/standalone.gypi --depth=. \

               -Dv8_target_arch=x64 \

               -S.x64  -Dv8_enable_backtrace=1 
-Dv8_can_use_vfp2_instructions=false -Dv8_can_use_vfp3_instructions=true 
-Darm_fpu=vfpv3 -Dwerror='' -Dv8_use_arm_eabi_hardfloat=true

  CXX(target) 
/Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/vendor/v8/out/x64.release/obj.target/preparser_lib/src/allocation.o

clang: warning: include path for libstdc++ headers not found; pass 
'-stdlib=libc++' on the command line to use the libc++ standard library instead 
[-Wstdlibcxx-not-found]

In file included from ../src/allocation.cc:33:

../src/utils.h:33:10: fatal error: 'climits' file not found

#include 

         ^

1 error generated.

make[1]: *** 
[/Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/vendor/v8/out/x64.release/obj.target/preparser_lib/src/allocation.o]
 Error 1

make: *** [x64.release] Error 2

/Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/ext/libv8/location.rb:36:in 
`block in verify_installation!': libv8 did not install properly, expected 
binary v8 archive 
'/Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/vendor/v8/out/x64.release/obj.target/tools/gyp/libv8_base.a'to
 exist, but it was not found (Libv8::Location::Vendor::ArchiveNotFound)

 from 
/Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/ext/libv8/location.rb:35:in 
`each'

 from 
/Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/ext/libv8/location.rb:35:in 
`verify_installation!'

 from 
/Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/ext/libv8/location.rb:26:in 
`install!'

 from extconf.rb:7:in `'

 

extconf failed, exit code 1

{noformat}

  was:
It looks that the ASF website does not build on my laptop with Apple silicon. 
It errors out when installing libv8 via the following command:

{noformat}

gem install libv8 -v '3.16.14.19' --source 'https://rubygems.org/'

{noformat}

The error logs are following:
{noformat}
current directory: /Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/ext/libv8

/System/Library/Frameworks/Ruby.framework/Versions/2.6/usr/bin/ruby -I 
/System/Library/Frameworks/Ruby.framework/Versions/2.6/usr/lib/ruby/2.6.0 -r 
./siteconf20220509-16154-19vsxkp.rb extconf.rb

creating Makefile

Applying 
/Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/patches/disable-building-tests.patch

Applying 
/Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/patches/disable-werror-on-osx.patch

Applying 
/Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/patches/disable-xcode-debugging.patch

Applying 
/Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/patches/do-not-imply-vfp3-and-armv7.patch

Applying 
/Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/patches/do-not-use-MAP_NORESERVE-on-freebsd.patch

Applying 
/Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/patches/do-not-use-vfp2.patch

Applying 
/Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/patches/fPIC-for-static.patch

Compiling v8 for x64

Using python 2.7.18

Using compiler: c++ (clang version 13.1.6)

Unable to find a compiler officially supported by v8.

It is recommended to use GCC v4.4 or higher

Beginning compilation. This will take some 

[jira] [Updated] (FLINK-27554) The asf-site does not build on Apple Silicon

2022-05-09 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated FLINK-27554:
-
Description: 
It looks that the ASF website does not build on my laptop with Apple silicon. 
It errors out when installing libv8 via the following command:

{noformat}

gem install libv8 -v '3.16.14.19' --source 'https://rubygems.org/'

{noformat}

The error logs are following:
{noformat}
current directory: /Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/ext/libv8

/System/Library/Frameworks/Ruby.framework/Versions/2.6/usr/bin/ruby -I 
/System/Library/Frameworks/Ruby.framework/Versions/2.6/usr/lib/ruby/2.6.0 -r 
./siteconf20220509-16154-19vsxkp.rb extconf.rb

creating Makefile

Applying 
/Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/patches/disable-building-tests.patch

Applying 
/Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/patches/disable-werror-on-osx.patch

Applying 
/Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/patches/disable-xcode-debugging.patch

Applying 
/Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/patches/do-not-imply-vfp3-and-armv7.patch

Applying 
/Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/patches/do-not-use-MAP_NORESERVE-on-freebsd.patch

Applying 
/Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/patches/do-not-use-vfp2.patch

Applying 
/Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/patches/fPIC-for-static.patch

Compiling v8 for x64

Using python 2.7.18

Using compiler: c++ (clang version 13.1.6)

Unable to find a compiler officially supported by v8.

It is recommended to use GCC v4.4 or higher

Beginning compilation. This will take some time.

Building v8 with env CXX=c++ LINK=c++  /usr/bin/make x64.release vfp2=off 
vfp3=on hardfp=on ARFLAGS.target=crs werror=no

GYP_GENERATORS=make \

 build/gyp/gyp --generator-output="out" build/all.gyp \

               -Ibuild/standalone.gypi --depth=. \

               -Dv8_target_arch=x64 \

               -S.x64  -Dv8_enable_backtrace=1 
-Dv8_can_use_vfp2_instructions=false -Dv8_can_use_vfp3_instructions=true 
-Darm_fpu=vfpv3 -Dwerror='' -Dv8_use_arm_eabi_hardfloat=true

  CXX(target) 
/Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/vendor/v8/out/x64.release/obj.target/preparser_lib/src/allocation.o

clang: warning: include path for libstdc++ headers not found; pass 
'-stdlib=libc++' on the command line to use the libc++ standard library instead 
[-Wstdlibcxx-not-found]

In file included from ../src/allocation.cc:33:

../src/utils.h:33:10: fatal error: 'climits' file not found

#include 

         ^

1 error generated.

make[1]: *** 
[/Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/vendor/v8/out/x64.release/obj.target/preparser_lib/src/allocation.o]
 Error 1

make: *** [x64.release] Error 2

/Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/ext/libv8/location.rb:36:in 
`block in verify_installation!': libv8 did not install properly, expected 
binary v8 archive 
'/Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/vendor/v8/out/x64.release/obj.target/tools/gyp/libv8_base.a'to
 exist, but it was not found (Libv8::Location::Vendor::ArchiveNotFound)

 from 
/Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/ext/libv8/location.rb:35:in 
`each'

 from 
/Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/ext/libv8/location.rb:35:in 
`verify_installation!'

 from 
/Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/ext/libv8/location.rb:26:in 
`install!'

 from extconf.rb:7:in `'

 

extconf failed, exit code 1

{noformat}

  was:
It looks that the ASF website does not build on my laptop with Apple silicon. 
It errors out when installing libv8. The error logs are following:

{noformat}

current directory: /Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/ext/libv8

/System/Library/Frameworks/Ruby.framework/Versions/2.6/usr/bin/ruby -I 
/System/Library/Frameworks/Ruby.framework/Versions/2.6/usr/lib/ruby/2.6.0 -r 
./siteconf20220509-16154-19vsxkp.rb extconf.rb

creating Makefile

Applying 
/Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/patches/disable-building-tests.patch

Applying 
/Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/patches/disable-werror-on-osx.patch

Applying 
/Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/patches/disable-xcode-debugging.patch

Applying 
/Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/patches/do-not-imply-vfp3-and-armv7.patch

Applying 
/Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/patches/do-not-use-MAP_NORESERVE-on-freebsd.patch

Applying 
/Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/patches/do-not-use-vfp2.patch

Applying 
/Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/patches/fPIC-for-static.patch

Compiling v8 for x64

Using python 2.7.18

Using compiler: c++ (clang version 13.1.6)

Unable to find a compiler officially supported by v8.

It is recommended to use GCC v4.4 or higher

Beginning compilation. This will take some time.

Building v8 with env CXX=c++ LINK=c++  /usr/bin/make x64.release vfp2=off 
vfp3=on hardfp=on 

[jira] [Created] (FLINK-27554) The asf-site does not build on Apple Silicon

2022-05-09 Thread Jiangjie Qin (Jira)
Jiangjie Qin created FLINK-27554:


 Summary: The asf-site does not build on Apple Silicon
 Key: FLINK-27554
 URL: https://issues.apache.org/jira/browse/FLINK-27554
 Project: Flink
  Issue Type: Improvement
  Components: Project Website
Affects Versions: 1.15.0
Reporter: Jiangjie Qin


It looks that the ASF website does not build on my laptop with Apple silicon. 
It errors out when installing libv8. The error logs are following:

{noformat}

current directory: /Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/ext/libv8

/System/Library/Frameworks/Ruby.framework/Versions/2.6/usr/bin/ruby -I 
/System/Library/Frameworks/Ruby.framework/Versions/2.6/usr/lib/ruby/2.6.0 -r 
./siteconf20220509-16154-19vsxkp.rb extconf.rb

creating Makefile

Applying 
/Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/patches/disable-building-tests.patch

Applying 
/Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/patches/disable-werror-on-osx.patch

Applying 
/Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/patches/disable-xcode-debugging.patch

Applying 
/Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/patches/do-not-imply-vfp3-and-armv7.patch

Applying 
/Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/patches/do-not-use-MAP_NORESERVE-on-freebsd.patch

Applying 
/Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/patches/do-not-use-vfp2.patch

Applying 
/Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/patches/fPIC-for-static.patch

Compiling v8 for x64

Using python 2.7.18

Using compiler: c++ (clang version 13.1.6)

Unable to find a compiler officially supported by v8.

It is recommended to use GCC v4.4 or higher

Beginning compilation. This will take some time.

Building v8 with env CXX=c++ LINK=c++  /usr/bin/make x64.release vfp2=off 
vfp3=on hardfp=on ARFLAGS.target=crs werror=no

GYP_GENERATORS=make \

 build/gyp/gyp --generator-output="out" build/all.gyp \

               -Ibuild/standalone.gypi --depth=. \

               -Dv8_target_arch=x64 \

               -S.x64  -Dv8_enable_backtrace=1 
-Dv8_can_use_vfp2_instructions=false -Dv8_can_use_vfp3_instructions=true 
-Darm_fpu=vfpv3 -Dwerror='' -Dv8_use_arm_eabi_hardfloat=true

  CXX(target) 
/Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/vendor/v8/out/x64.release/obj.target/preparser_lib/src/allocation.o

clang: warning: include path for libstdc++ headers not found; pass 
'-stdlib=libc++' on the command line to use the libc++ standard library instead 
[-Wstdlibcxx-not-found]

In file included from ../src/allocation.cc:33:

../src/utils.h:33:10: fatal error: 'climits' file not found

#include 

         ^

1 error generated.

make[1]: *** 
[/Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/vendor/v8/out/x64.release/obj.target/preparser_lib/src/allocation.o]
 Error 1

make: *** [x64.release] Error 2

/Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/ext/libv8/location.rb:36:in 
`block in verify_installation!': libv8 did not install properly, expected 
binary v8 archive 
'/Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/vendor/v8/out/x64.release/obj.target/tools/gyp/libv8_base.a'to
 exist, but it was not found (Libv8::Location::Vendor::ArchiveNotFound)

 from 
/Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/ext/libv8/location.rb:35:in 
`each'

 from 
/Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/ext/libv8/location.rb:35:in 
`verify_installation!'

 from 
/Library/Ruby/Gems/2.6.0/gems/libv8-3.16.14.19/ext/libv8/location.rb:26:in 
`install!'

 from extconf.rb:7:in `'

 

extconf failed, exit code 1

{noformat}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Closed] (FLINK-27295) UnalignedCheckpointITCase failed due to OperatorCoordinatorHolder cannot mark checkpoint

2022-04-19 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin closed FLINK-27295.

Resolution: Duplicate

Duplicate of FLINK-27148

> UnalignedCheckpointITCase failed due to OperatorCoordinatorHolder cannot mark 
> checkpoint
> 
>
> Key: FLINK-27295
> URL: https://issues.apache.org/jira/browse/FLINK-27295
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.16.0
>Reporter: Yun Tang
>Priority: Major
>
>  
> {code:java}
> 09:17:42,931 [flink-akka.actor.default-dispatcher-9] INFO  
> org.apache.flink.runtime.jobmaster.JobMaster [] - Trying to 
> recover from a global failure.
> java.lang.IllegalStateException: Cannot mark for checkpoint 17, already 
> marked for checkpoint 16
>   at 
> org.apache.flink.runtime.operators.coordination.OperatorEventValve.markForCheckpoint(OperatorEventValve.java:113)
>  ~[flink-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>   at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.checkpointCoordinatorInternal(OperatorCoordinatorHolder.java:302)
>  ~[flink-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>   at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.lambda$checkpointCoordinator$0(OperatorCoordinatorHolder.java:230)
>  ~[flink-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:444)
>  ~[flink-rpc-akka_6cc3640b-2a0a-43c9-a46c-e21f0e6e55f6.jar:1.16-SNAPSHOT]
>   at 
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
>  ~[flink-rpc-akka_6cc3640b-2a0a-43c9-a46c-e21f0e6e55f6.jar:1.16-SNAPSHOT]
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:444)
>  ~[flink-rpc-akka_6cc3640b-2a0a-43c9-a46c-e21f0e6e55f6.jar:1.16-SNAPSHOT]
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:214)
>  ~[flink-rpc-akka_6cc3640b-2a0a-43c9-a46c-e21f0e6e55f6.jar:1.16-SNAPSHOT]
>   at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
>  ~[flink-rpc-akka_6cc3640b-2a0a-43c9-a46c-e21f0e6e55f6.jar:1.16-SNAPSHOT]
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:164)
>  ~[flink-rpc-akka_6cc3640b-2a0a-43c9-a46c-e21f0e6e55f6.jar:1.16-SNAPSHOT]
>   at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) 
> [flink-rpc-akka_6cc3640b-2a0a-43c9-a46c-e21f0e6e55f6.jar:1.16-SNAPSHOT]
>   at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) 
> [flink-rpc-akka_6cc3640b-2a0a-43c9-a46c-e21f0e6e55f6.jar:1.16-SNAPSHOT]
>   at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) 
> [flink-rpc-akka_6cc3640b-2a0a-43c9-a46c-e21f0e6e55f6.jar:1.16-SNAPSHOT]
>   at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) 
> [flink-rpc-akka_6cc3640b-2a0a-43c9-a46c-e21f0e6e55f6.jar:1.16-SNAPSHOT]
>   at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) 
> [flink-rpc-akka_6cc3640b-2a0a-43c9-a46c-e21f0e6e55f6.jar:1.16-SNAPSHOT]
>   at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
> [flink-rpc-akka_6cc3640b-2a0a-43c9-a46c-e21f0e6e55f6.jar:1.16-SNAPSHOT]
>   at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
> [flink-rpc-akka_6cc3640b-2a0a-43c9-a46c-e21f0e6e55f6.jar:1.16-SNAPSHOT]
>   at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
> [flink-rpc-akka_6cc3640b-2a0a-43c9-a46c-e21f0e6e55f6.jar:1.16-SNAPSHOT]
>   at akka.actor.Actor.aroundReceive(Actor.scala:537) 
> [flink-rpc-akka_6cc3640b-2a0a-43c9-a46c-e21f0e6e55f6.jar:1.16-SNAPSHOT]
>   at akka.actor.Actor.aroundReceive$(Actor.scala:535) 
> [flink-rpc-akka_6cc3640b-2a0a-43c9-a46c-e21f0e6e55f6.jar:1.16-SNAPSHOT]
>   at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) 
> [flink-rpc-akka_6cc3640b-2a0a-43c9-a46c-e21f0e6e55f6.jar:1.16-SNAPSHOT]
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) 
> [flink-rpc-akka_6cc3640b-2a0a-43c9-a46c-e21f0e6e55f6.jar:1.16-SNAPSHOT]
>   at akka.actor.ActorCell.invoke(ActorCell.scala:548) 
> [flink-rpc-akka_6cc3640b-2a0a-43c9-a46c-e21f0e6e55f6.jar:1.16-SNAPSHOT]
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) 
> [flink-rpc-akka_6cc3640b-2a0a-43c9-a46c-e21f0e6e55f6.jar:1.16-SNAPSHOT]
>   at akka.dispatch.Mailbox.run(Mailbox.scala:231) 
> [flink-rpc-akka_6cc3640b-2a0a-43c9-a46c-e21f0e6e55f6.jar:1.16-SNAPSHOT]
>   at akka.dispatch.Mailbox.exec(Mailbox.scala:243) 
> 

[jira] [Commented] (FLINK-27295) UnalignedCheckpointITCase failed due to OperatorCoordinatorHolder cannot mark checkpoint

2022-04-19 Thread Jiangjie Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17524679#comment-17524679
 ] 

Jiangjie Qin commented on FLINK-27295:
--

It looks a duplicate of FLINK-27148.

> UnalignedCheckpointITCase failed due to OperatorCoordinatorHolder cannot mark 
> checkpoint
> 
>
> Key: FLINK-27295
> URL: https://issues.apache.org/jira/browse/FLINK-27295
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.16.0
>Reporter: Yun Tang
>Priority: Major
>
>  
> {code:java}
> 09:17:42,931 [flink-akka.actor.default-dispatcher-9] INFO  
> org.apache.flink.runtime.jobmaster.JobMaster [] - Trying to 
> recover from a global failure.
> java.lang.IllegalStateException: Cannot mark for checkpoint 17, already 
> marked for checkpoint 16
>   at 
> org.apache.flink.runtime.operators.coordination.OperatorEventValve.markForCheckpoint(OperatorEventValve.java:113)
>  ~[flink-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>   at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.checkpointCoordinatorInternal(OperatorCoordinatorHolder.java:302)
>  ~[flink-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>   at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.lambda$checkpointCoordinator$0(OperatorCoordinatorHolder.java:230)
>  ~[flink-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:444)
>  ~[flink-rpc-akka_6cc3640b-2a0a-43c9-a46c-e21f0e6e55f6.jar:1.16-SNAPSHOT]
>   at 
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
>  ~[flink-rpc-akka_6cc3640b-2a0a-43c9-a46c-e21f0e6e55f6.jar:1.16-SNAPSHOT]
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:444)
>  ~[flink-rpc-akka_6cc3640b-2a0a-43c9-a46c-e21f0e6e55f6.jar:1.16-SNAPSHOT]
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:214)
>  ~[flink-rpc-akka_6cc3640b-2a0a-43c9-a46c-e21f0e6e55f6.jar:1.16-SNAPSHOT]
>   at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
>  ~[flink-rpc-akka_6cc3640b-2a0a-43c9-a46c-e21f0e6e55f6.jar:1.16-SNAPSHOT]
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:164)
>  ~[flink-rpc-akka_6cc3640b-2a0a-43c9-a46c-e21f0e6e55f6.jar:1.16-SNAPSHOT]
>   at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) 
> [flink-rpc-akka_6cc3640b-2a0a-43c9-a46c-e21f0e6e55f6.jar:1.16-SNAPSHOT]
>   at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) 
> [flink-rpc-akka_6cc3640b-2a0a-43c9-a46c-e21f0e6e55f6.jar:1.16-SNAPSHOT]
>   at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) 
> [flink-rpc-akka_6cc3640b-2a0a-43c9-a46c-e21f0e6e55f6.jar:1.16-SNAPSHOT]
>   at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) 
> [flink-rpc-akka_6cc3640b-2a0a-43c9-a46c-e21f0e6e55f6.jar:1.16-SNAPSHOT]
>   at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) 
> [flink-rpc-akka_6cc3640b-2a0a-43c9-a46c-e21f0e6e55f6.jar:1.16-SNAPSHOT]
>   at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
> [flink-rpc-akka_6cc3640b-2a0a-43c9-a46c-e21f0e6e55f6.jar:1.16-SNAPSHOT]
>   at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
> [flink-rpc-akka_6cc3640b-2a0a-43c9-a46c-e21f0e6e55f6.jar:1.16-SNAPSHOT]
>   at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
> [flink-rpc-akka_6cc3640b-2a0a-43c9-a46c-e21f0e6e55f6.jar:1.16-SNAPSHOT]
>   at akka.actor.Actor.aroundReceive(Actor.scala:537) 
> [flink-rpc-akka_6cc3640b-2a0a-43c9-a46c-e21f0e6e55f6.jar:1.16-SNAPSHOT]
>   at akka.actor.Actor.aroundReceive$(Actor.scala:535) 
> [flink-rpc-akka_6cc3640b-2a0a-43c9-a46c-e21f0e6e55f6.jar:1.16-SNAPSHOT]
>   at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) 
> [flink-rpc-akka_6cc3640b-2a0a-43c9-a46c-e21f0e6e55f6.jar:1.16-SNAPSHOT]
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) 
> [flink-rpc-akka_6cc3640b-2a0a-43c9-a46c-e21f0e6e55f6.jar:1.16-SNAPSHOT]
>   at akka.actor.ActorCell.invoke(ActorCell.scala:548) 
> [flink-rpc-akka_6cc3640b-2a0a-43c9-a46c-e21f0e6e55f6.jar:1.16-SNAPSHOT]
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) 
> [flink-rpc-akka_6cc3640b-2a0a-43c9-a46c-e21f0e6e55f6.jar:1.16-SNAPSHOT]
>   at akka.dispatch.Mailbox.run(Mailbox.scala:231) 
> [flink-rpc-akka_6cc3640b-2a0a-43c9-a46c-e21f0e6e55f6.jar:1.16-SNAPSHOT]
>   at akka.dispatch.Mailbox.exec(Mailbox.scala:243) 
> 

[jira] [Resolved] (FLINK-26723) Fix the content of exception in SourceCoordinatorContext

2022-03-20 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin resolved FLINK-26723.
--
Fix Version/s: 1.15.0
   1.14.4
   Resolution: Fixed

PR Merged.

master: da79677c9afcd6703ee5cd513740981530916f2f
release-1.15: e3992ab17a4515076c287b8d12003e1449718df1
release-1.14: 36136bc2bc33f0dc0add1303af949f681a7e42cd

> Fix the content of exception in SourceCoordinatorContext
> 
>
> Key: FLINK-26723
> URL: https://issues.apache.org/jira/browse/FLINK-26723
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: zoucao
>Assignee: zoucao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0, 1.14.4
>
>
> the exception in `SourceCoordinatorContext#SourceCoordinatorContext` will 
> always be 
> "Cannot assign splits null to subtask xxx because the subtask is not 
> registered."
> We should fix it by using splits info.
> [see|https://github.com/apache/flink/blob/ccbb05ea4f11aac51103cadd13a6a2e38e319e8b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java#L202]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (FLINK-26723) Fix the content of exception in SourceCoordinatorContext

2022-03-19 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin reassigned FLINK-26723:


Assignee: zoucao

> Fix the content of exception in SourceCoordinatorContext
> 
>
> Key: FLINK-26723
> URL: https://issues.apache.org/jira/browse/FLINK-26723
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: zoucao
>Assignee: zoucao
>Priority: Major
>  Labels: pull-request-available
>
> the exception in `SourceCoordinatorContext#SourceCoordinatorContext` will 
> always be 
> "Cannot assign splits null to subtask xxx because the subtask is not 
> registered."
> We should fix it by using splits info.
> [see|https://github.com/apache/flink/blob/ccbb05ea4f11aac51103cadd13a6a2e38e319e8b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java#L202]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (FLINK-26723) Fix the content of exception in SourceCoordinatorContext

2022-03-18 Thread Jiangjie Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17508760#comment-17508760
 ] 

Jiangjie Qin edited comment on FLINK-26723 at 3/18/22, 12:51 PM:
-

[~zoucao]  yes, we should get the splits from the assignment here. That is a 
bug. Do you want to submit a patch?


was (Author: becket_qin):
[~zoucao]  yes, we should get the splits from the assignment here. That is a 
bug.

> Fix the content of exception in SourceCoordinatorContext
> 
>
> Key: FLINK-26723
> URL: https://issues.apache.org/jira/browse/FLINK-26723
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: zoucao
>Priority: Major
>
> the exception in `SourceCoordinatorContext#SourceCoordinatorContext` will 
> always be 
> "Cannot assign splits null to subtask xxx because the subtask is not 
> registered."
> We should fix it by using splits info.
> [see|https://github.com/apache/flink/blob/ccbb05ea4f11aac51103cadd13a6a2e38e319e8b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java#L202]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] (FLINK-26723) Fix the content of exception in SourceCoordinatorContext

2022-03-18 Thread Jiangjie Qin (Jira)


[ https://issues.apache.org/jira/browse/FLINK-26723 ]


Jiangjie Qin deleted comment on FLINK-26723:
--

was (Author: becket_qin):
Do you want to submit a patch?

> Fix the content of exception in SourceCoordinatorContext
> 
>
> Key: FLINK-26723
> URL: https://issues.apache.org/jira/browse/FLINK-26723
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: zoucao
>Priority: Major
>
> the exception in `SourceCoordinatorContext#SourceCoordinatorContext` will 
> always be 
> "Cannot assign splits null to subtask xxx because the subtask is not 
> registered."
> We should fix it by using splits info.
> [see|https://github.com/apache/flink/blob/ccbb05ea4f11aac51103cadd13a6a2e38e319e8b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java#L202]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26723) Fix the content of exception in SourceCoordinatorContext

2022-03-18 Thread Jiangjie Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17508760#comment-17508760
 ] 

Jiangjie Qin commented on FLINK-26723:
--

[~zoucao]  yes, we should get the splits from the assignment here. That is a 
bug.

> Fix the content of exception in SourceCoordinatorContext
> 
>
> Key: FLINK-26723
> URL: https://issues.apache.org/jira/browse/FLINK-26723
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: zoucao
>Priority: Major
>
> the exception in `SourceCoordinatorContext#SourceCoordinatorContext` will 
> always be 
> "Cannot assign splits null to subtask xxx because the subtask is not 
> registered."
> We should fix it by using splits info.
> [see|https://github.com/apache/flink/blob/ccbb05ea4f11aac51103cadd13a6a2e38e319e8b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java#L202]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26723) Fix the content of exception in SourceCoordinatorContext

2022-03-18 Thread Jiangjie Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17508762#comment-17508762
 ] 

Jiangjie Qin commented on FLINK-26723:
--

Do you want to submit a patch?

> Fix the content of exception in SourceCoordinatorContext
> 
>
> Key: FLINK-26723
> URL: https://issues.apache.org/jira/browse/FLINK-26723
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: zoucao
>Priority: Major
>
> the exception in `SourceCoordinatorContext#SourceCoordinatorContext` will 
> always be 
> "Cannot assign splits null to subtask xxx because the subtask is not 
> registered."
> We should fix it by using splits info.
> [see|https://github.com/apache/flink/blob/ccbb05ea4f11aac51103cadd13a6a2e38e319e8b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java#L202]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26394) CheckpointCoordinator.isTriggering can not be reset if a checkpoint expires while the checkpointCoordinator task is queuing in the SourceCoordinator executor.

2022-03-13 Thread Jiangjie Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17505968#comment-17505968
 ] 

Jiangjie Qin commented on FLINK-26394:
--

Good catch. We need to recycle the futures that are handled to the 
SourceCoordinator (and in general any user implemented plugins) in case it is 
not completed for some reason.

> CheckpointCoordinator.isTriggering can not be reset if a checkpoint expires 
> while the checkpointCoordinator task is queuing in the SourceCoordinator 
> executor.
> --
>
> Key: FLINK-26394
> URL: https://issues.apache.org/jira/browse/FLINK-26394
> Project: Flink
>  Issue Type: Bug
>Reporter: Gen Luo
>Priority: Major
>  Labels: pull-request-available
>
> We found a job can no longer trigger checkpoints or savepoints after 
> recovering from a checkpoint timeout failure. After investigation, we found 
> that the `isTriggering` flag is CheckpointCoordinator is true while no 
> checkpoint is actually doing, and the root cause is as following:
>  
>  # The job uses a source whose coordinator needs to scan a table while 
> requesting splits, which may cost more than 10min. The source coordinator 
> executor thread will be occupied by `handleSplitRequest`, and 
> `checkpointCoordinator` task of the first checkpoint will be queued after it.
>  # 10min later, the checkpoint is expired, removing the pending checkpoint 
> from the coordinator, and triggering a global failover. But the 
> `isTriggering` is not reset here. It can only be reset after the checkpoint 
> completable future is done, which is now holding only by the 
> `checkpointCoordinator` task in the queue, along with the PendingCheckpoint.
>  # Then the job failover, and the RecreateOnResetOperatorCoordinator will 
> recreate a new SourceCoordinator, and close the previous coordinator 
> asynchronously. Timeout for the closing is fixed to 60s. SourceCoordinator 
> will try to `shutdown` the coordinator executor then `awaitTermination`. If 
> the tasks are done within 60s, nothing wrong will happen.
>  # But if the closing method is stuck for more than 60s (which in this case 
> is actually stuck in the `handleSplitRequest`), the async closing thread will 
> be interrupted and SourceCoordinator will `shutdownNow` the executor. All 
> tasks queuing will be discarded, including the `checkpointCoordinator` task.
>  # Then the checkpoint completable future will never complete and the 
> `isTriggering` flag will never be reset.
>  
> I see that the closing part of SourceCoordinator is recently refactored. But 
> I find the new implementation also has this issue. And since it calls 
> `shutdownNow` directly, the issue should be easier to encounter.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25256) Savepoints do not work with ExternallyInducedSources

2022-03-08 Thread Jiangjie Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17503274#comment-17503274
 ] 

Jiangjie Qin commented on FLINK-25256:
--

[~Brian Zhou] I think [~renqs] and [~Leonard] seems the best person to follow 
up with this issue.

> Savepoints do not work with ExternallyInducedSources
> 
>
> Key: FLINK-25256
> URL: https://issues.apache.org/jira/browse/FLINK-25256
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.14.0, 1.13.3
>Reporter: Dawid Wysakowicz
>Priority: Major
>
> It is not possible to take a proper savepoint with 
> {{ExternallyInducedSource}} or {{ExternallyInducedSourceReader}} (both legacy 
> and FLIP-27 versions). The problem is that we're hardcoding 
> {{CheckpointOptions}} in the {{triggerHook}}.
> The outcome of current state is that operators would try to take checkpoints 
> in the checkpoint location whereas the {{CheckpointCoordinator}} will write 
> metadata for those states in the savepoint location.
> Moreover the situation gets even weirder (I have not checked it entirely), if 
> we have a mixture of {{ExternallyInducedSource(s)}} and regular sources. In 
> such a case the location and format at which the state of a particular task 
> is persisted depends on the order of barriers arrival. If a barrier from a 
> regular source arrives last the task takes a savepoint, on the other hand if 
> last barrier is from an externally induced source it will take a checkpoint.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-21364) piggyback finishedSplitIds in RequestSplitEvent

2022-03-08 Thread Jiangjie Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17503240#comment-17503240
 ] 

Jiangjie Qin commented on FLINK-21364:
--

[~stevenz3wu] Sorry for the late response. I somehow missed the notification 
email. Personally I feel that we should probably have a separate 
{{FinishedSplitsEvent}} regardless whether we piggyback the finishedSplitIds in 
the {{{}RequestSplitEvent{}}}. That sounds more of an optimization to save an 
event sending when possible.

> piggyback finishedSplitIds in RequestSplitEvent
> ---
>
> Key: FLINK-21364
> URL: https://issues.apache.org/jira/browse/FLINK-21364
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Common
>Affects Versions: 1.12.1
>Reporter: Steven Zhen Wu
>Priority: Minor
>  Labels: auto-deprioritized-major, pull-request-available
>
> For some split assignment strategy, the enumerator/assigner needs to track 
> the completed splits to advance watermark for event time alignment or rough 
> ordering. Right now, `RequestSplitEvent` for FLIP-27 source doesn't support 
> pass-along of the `finishedSplitIds` info and hence we have to create our own 
> custom source event type for Iceberg source. 
> Here is the proposal of add such optional info to `RequestSplitEvent`.
> {code}
> public RequestSplitEvent(
> @Nullable String hostName, 
> @Nullable Collection finishedSplitIds)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25256) Savepoints do not work with ExternallyInducedSources

2022-03-02 Thread Jiangjie Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17500468#comment-17500468
 ] 

Jiangjie Qin commented on FLINK-25256:
--

[~dwysakowicz] Pravega essentially uses an in-band checkpoint style. The 
checkpoint process is roughly the following:
 # Flink CheckpointCoordinator initiates the checkpoint, and invokes the 
MasterHooks.triggerHook(). The Pravega hook then tell the Pravega server that 
the Flink job has triggered a checkpoint.
 # The Pravega server inserts the checkpoint control messages in the data 
stream to each of the Prevaga readers of the Flink job.
 # When the Prevaga readers see the checkpoint control messages, they trigger 
the Flink task checkpoint via the {{ExternallyInducedSource.CheckpointTrigger}}

After FLIP-27, the SplitEnumerator can completely replace the MasterHook in JM. 
But Prevaga connector still relies on the 
{{ExternallyInducedSource.CheckpointTrigger}} to perform checkpoint in the 
subtasks.

Ultimately, the requirement is to manipulate the task based on some user space 
records. A similar requirement is stopping the subtask when it sees a given 
message in the stream. What we need to think of would be how much control plane 
actions do we want to expose to the users. So far the two actions we see are 
taking checkpoint on the tasks and stopping the tasks, and by now such 
manipulation requirements are only in the Source tasks.

We can probably just make such records driven task actions a more explicit 
primitive for the users. For example, we can have an interface like 
{{{}TaskActionTrigger{}}}, which is passed to each user logic. And that allows 
user logic to ask the task to take some action based on the records it 
processed. That said, I do think such control plane exposure should be minimal.

> Savepoints do not work with ExternallyInducedSources
> 
>
> Key: FLINK-25256
> URL: https://issues.apache.org/jira/browse/FLINK-25256
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.14.0, 1.13.3
>Reporter: Dawid Wysakowicz
>Priority: Major
>
> It is not possible to take a proper savepoint with 
> {{ExternallyInducedSource}} or {{ExternallyInducedSourceReader}} (both legacy 
> and FLIP-27 versions). The problem is that we're hardcoding 
> {{CheckpointOptions}} in the {{triggerHook}}.
> The outcome of current state is that operators would try to take checkpoints 
> in the checkpoint location whereas the {{CheckpointCoordinator}} will write 
> metadata for those states in the savepoint location.
> Moreover the situation gets even weirder (I have not checked it entirely), if 
> we have a mixture of {{ExternallyInducedSource(s)}} and regular sources. In 
> such a case the location and format at which the state of a particular task 
> is persisted depends on the order of barriers arrival. If a barrier from a 
> regular source arrives last the task takes a savepoint, on the other hand if 
> last barrier is from an externally induced source it will take a checkpoint.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (FLINK-24607) SourceCoordinator may miss to close SplitEnumerator when failover frequently

2022-02-23 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24607?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin resolved FLINK-24607.
--
Resolution: Fixed

> SourceCoordinator may miss to close SplitEnumerator when failover frequently
> 
>
> Key: FLINK-24607
> URL: https://issues.apache.org/jira/browse/FLINK-24607
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.13.3
>Reporter: Jark Wu
>Assignee: Jiangjie Qin
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.15.0, 1.14.4, 1.13.7
>
> Attachments: jobmanager.log
>
>
> We are having a connection leak problem when using mysql-cdc [1] source. We 
> observed that many enumerators are not closed from the JM log.
> {code}
> ➜  test123 cat jobmanager.log | grep "SourceCoordinator \[\] - Restoring 
> SplitEnumerator" | wc -l
>  264
> ➜  test123 cat jobmanager.log | grep "SourceCoordinator \[\] - Starting split 
> enumerator" | wc -l
>  264
> ➜  test123 cat jobmanager.log | grep "MySqlSourceEnumerator \[\] - Starting 
> enumerator" | wc -l
>  263
> ➜  test123 cat jobmanager.log | grep "SourceCoordinator \[\] - Closing 
> SourceCoordinator" | wc -l
>  264
> ➜  test123 cat jobmanager.log | grep "MySqlSourceEnumerator \[\] - Closing 
> enumerator" | wc -l
>  195
> {code}
> We added "Closing enumerator" log in {{MySqlSourceEnumerator#close()}}, and 
> "Starting enumerator" in {{MySqlSourceEnumerator#start()}}. From the above 
> result you can see that SourceCoordinator is restored and closed 264 times, 
> split enumerator is started 264 but only closed 195 times. It seems that 
> {{SourceCoordinator}} misses to close enumerator when job failover 
> frequently. 
> I also went throught the code of {{SourceCoordinator}} and found some 
> suspicious point:
> The {{started}} flag and  {{enumerator}} is assigned in the main thread, 
> however {{SourceCoordinator#close()}} is executed async by 
> {{DeferrableCoordinator#closeAsync}}.  That means the close method will check 
> the {{started}} and {{enumerator}} variable async. Is there any concurrency 
> problem here which mean lead to dirty read and miss to close the 
> {{enumerator}}? 
> I'm still not sure, because it's hard to reproduce locally, and we can't 
> deploy a custom flink version to production env. 
> [1]: https://github.com/ververica/flink-cdc-connectors



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-24607) SourceCoordinator may miss to close SplitEnumerator when failover frequently

2022-02-23 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24607?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated FLINK-24607:
-
Fix Version/s: 1.13.7

> SourceCoordinator may miss to close SplitEnumerator when failover frequently
> 
>
> Key: FLINK-24607
> URL: https://issues.apache.org/jira/browse/FLINK-24607
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.13.3
>Reporter: Jark Wu
>Assignee: Jiangjie Qin
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.15.0, 1.14.4, 1.13.7
>
> Attachments: jobmanager.log
>
>
> We are having a connection leak problem when using mysql-cdc [1] source. We 
> observed that many enumerators are not closed from the JM log.
> {code}
> ➜  test123 cat jobmanager.log | grep "SourceCoordinator \[\] - Restoring 
> SplitEnumerator" | wc -l
>  264
> ➜  test123 cat jobmanager.log | grep "SourceCoordinator \[\] - Starting split 
> enumerator" | wc -l
>  264
> ➜  test123 cat jobmanager.log | grep "MySqlSourceEnumerator \[\] - Starting 
> enumerator" | wc -l
>  263
> ➜  test123 cat jobmanager.log | grep "SourceCoordinator \[\] - Closing 
> SourceCoordinator" | wc -l
>  264
> ➜  test123 cat jobmanager.log | grep "MySqlSourceEnumerator \[\] - Closing 
> enumerator" | wc -l
>  195
> {code}
> We added "Closing enumerator" log in {{MySqlSourceEnumerator#close()}}, and 
> "Starting enumerator" in {{MySqlSourceEnumerator#start()}}. From the above 
> result you can see that SourceCoordinator is restored and closed 264 times, 
> split enumerator is started 264 but only closed 195 times. It seems that 
> {{SourceCoordinator}} misses to close enumerator when job failover 
> frequently. 
> I also went throught the code of {{SourceCoordinator}} and found some 
> suspicious point:
> The {{started}} flag and  {{enumerator}} is assigned in the main thread, 
> however {{SourceCoordinator#close()}} is executed async by 
> {{DeferrableCoordinator#closeAsync}}.  That means the close method will check 
> the {{started}} and {{enumerator}} variable async. Is there any concurrency 
> problem here which mean lead to dirty read and miss to close the 
> {{enumerator}}? 
> I'm still not sure, because it's hard to reproduce locally, and we can't 
> deploy a custom flink version to production env. 
> [1]: https://github.com/ververica/flink-cdc-connectors



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-24607) SourceCoordinator may miss to close SplitEnumerator when failover frequently

2022-02-23 Thread Jiangjie Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24607?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17497109#comment-17497109
 ] 

Jiangjie Qin commented on FLINK-24607:
--

Patch merged.

Master: 0f19c2472c54aac97e4067f5398731ab90036d1a

release-1.14: 0a76d632f33d9a69df87457a63043bd7f609ed40

release-1.13: 6fb7807b20cba05437a7b570c704d643432745eb

> SourceCoordinator may miss to close SplitEnumerator when failover frequently
> 
>
> Key: FLINK-24607
> URL: https://issues.apache.org/jira/browse/FLINK-24607
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.13.3
>Reporter: Jark Wu
>Assignee: Jiangjie Qin
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.15.0, 1.14.4, 1.13.7
>
> Attachments: jobmanager.log
>
>
> We are having a connection leak problem when using mysql-cdc [1] source. We 
> observed that many enumerators are not closed from the JM log.
> {code}
> ➜  test123 cat jobmanager.log | grep "SourceCoordinator \[\] - Restoring 
> SplitEnumerator" | wc -l
>  264
> ➜  test123 cat jobmanager.log | grep "SourceCoordinator \[\] - Starting split 
> enumerator" | wc -l
>  264
> ➜  test123 cat jobmanager.log | grep "MySqlSourceEnumerator \[\] - Starting 
> enumerator" | wc -l
>  263
> ➜  test123 cat jobmanager.log | grep "SourceCoordinator \[\] - Closing 
> SourceCoordinator" | wc -l
>  264
> ➜  test123 cat jobmanager.log | grep "MySqlSourceEnumerator \[\] - Closing 
> enumerator" | wc -l
>  195
> {code}
> We added "Closing enumerator" log in {{MySqlSourceEnumerator#close()}}, and 
> "Starting enumerator" in {{MySqlSourceEnumerator#start()}}. From the above 
> result you can see that SourceCoordinator is restored and closed 264 times, 
> split enumerator is started 264 but only closed 195 times. It seems that 
> {{SourceCoordinator}} misses to close enumerator when job failover 
> frequently. 
> I also went throught the code of {{SourceCoordinator}} and found some 
> suspicious point:
> The {{started}} flag and  {{enumerator}} is assigned in the main thread, 
> however {{SourceCoordinator#close()}} is executed async by 
> {{DeferrableCoordinator#closeAsync}}.  That means the close method will check 
> the {{started}} and {{enumerator}} variable async. Is there any concurrency 
> problem here which mean lead to dirty read and miss to close the 
> {{enumerator}}? 
> I'm still not sure, because it's hard to reproduce locally, and we can't 
> deploy a custom flink version to production env. 
> [1]: https://github.com/ververica/flink-cdc-connectors



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-24607) SourceCoordinator may miss to close SplitEnumerator when failover frequently

2022-02-22 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24607?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated FLINK-24607:
-
Fix Version/s: (was: 1.13.7)

> SourceCoordinator may miss to close SplitEnumerator when failover frequently
> 
>
> Key: FLINK-24607
> URL: https://issues.apache.org/jira/browse/FLINK-24607
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.13.3
>Reporter: Jark Wu
>Assignee: Jiangjie Qin
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.15.0, 1.14.4
>
> Attachments: jobmanager.log
>
>
> We are having a connection leak problem when using mysql-cdc [1] source. We 
> observed that many enumerators are not closed from the JM log.
> {code}
> ➜  test123 cat jobmanager.log | grep "SourceCoordinator \[\] - Restoring 
> SplitEnumerator" | wc -l
>  264
> ➜  test123 cat jobmanager.log | grep "SourceCoordinator \[\] - Starting split 
> enumerator" | wc -l
>  264
> ➜  test123 cat jobmanager.log | grep "MySqlSourceEnumerator \[\] - Starting 
> enumerator" | wc -l
>  263
> ➜  test123 cat jobmanager.log | grep "SourceCoordinator \[\] - Closing 
> SourceCoordinator" | wc -l
>  264
> ➜  test123 cat jobmanager.log | grep "MySqlSourceEnumerator \[\] - Closing 
> enumerator" | wc -l
>  195
> {code}
> We added "Closing enumerator" log in {{MySqlSourceEnumerator#close()}}, and 
> "Starting enumerator" in {{MySqlSourceEnumerator#start()}}. From the above 
> result you can see that SourceCoordinator is restored and closed 264 times, 
> split enumerator is started 264 but only closed 195 times. It seems that 
> {{SourceCoordinator}} misses to close enumerator when job failover 
> frequently. 
> I also went throught the code of {{SourceCoordinator}} and found some 
> suspicious point:
> The {{started}} flag and  {{enumerator}} is assigned in the main thread, 
> however {{SourceCoordinator#close()}} is executed async by 
> {{DeferrableCoordinator#closeAsync}}.  That means the close method will check 
> the {{started}} and {{enumerator}} variable async. Is there any concurrency 
> problem here which mean lead to dirty read and miss to close the 
> {{enumerator}}? 
> I'm still not sure, because it's hard to reproduce locally, and we can't 
> deploy a custom flink version to production env. 
> [1]: https://github.com/ververica/flink-cdc-connectors



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-24844) CassandraConnectorITCase.testCassandraBatchPojoFormat fails on AZP

2022-02-14 Thread Jiangjie Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17492363#comment-17492363
 ] 

Jiangjie Qin commented on FLINK-24844:
--

Another failure. 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=31374=logs=d44f43ce-542c-597d-bf94-b0718c71e5e8=ed165f3f-d0f6-524b-5279-86f8ee7d0e2d=12262

> CassandraConnectorITCase.testCassandraBatchPojoFormat fails on AZP
> --
>
> Key: FLINK-24844
> URL: https://issues.apache.org/jira/browse/FLINK-24844
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Cassandra
>Affects Versions: 1.14.0
>Reporter: Till Rohrmann
>Assignee: Etienne Chauchot
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.15.0, 1.14.4, 1.13.7
>
>
> The test {{CassandraConnectorITCase.testCassandraBatchPojoFormat}} fails on 
> AZP with
> {code}
> 2021-11-09T00:32:42.1369473Z Nov 09 00:32:42 [ERROR] Tests run: 17, Failures: 
> 0, Errors: 1, Skipped: 0, Time elapsed: 152.962 s <<< FAILURE! - in 
> org.apache.flink.streaming.connectors.cassandra.CassandraConnectorITCase
> 2021-11-09T00:32:42.1371638Z Nov 09 00:32:42 [ERROR] 
> testCassandraBatchPojoFormat  Time elapsed: 20.378 s  <<< ERROR!
> 2021-11-09T00:32:42.1372881Z Nov 09 00:32:42 
> com.datastax.driver.core.exceptions.AlreadyExistsException: Table 
> flink.batches already exists
> 2021-11-09T00:32:42.1373913Z Nov 09 00:32:42  at 
> com.datastax.driver.core.exceptions.AlreadyExistsException.copy(AlreadyExistsException.java:111)
> 2021-11-09T00:32:42.1374921Z Nov 09 00:32:42  at 
> com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:37)
> 2021-11-09T00:32:42.1379615Z Nov 09 00:32:42  at 
> com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:245)
> 2021-11-09T00:32:42.1380668Z Nov 09 00:32:42  at 
> com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:63)
> 2021-11-09T00:32:42.1381523Z Nov 09 00:32:42  at 
> com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:39)
> 2021-11-09T00:32:42.1382552Z Nov 09 00:32:42  at 
> org.apache.flink.streaming.connectors.cassandra.CassandraConnectorITCase.testCassandraBatchPojoFormat(CassandraConnectorITCase.java:543)
> 2021-11-09T00:32:42.1383487Z Nov 09 00:32:42  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2021-11-09T00:32:42.1384433Z Nov 09 00:32:42  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2021-11-09T00:32:42.1385336Z Nov 09 00:32:42  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2021-11-09T00:32:42.1386119Z Nov 09 00:32:42  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2021-11-09T00:32:42.1387204Z Nov 09 00:32:42  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> 2021-11-09T00:32:42.1388225Z Nov 09 00:32:42  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2021-11-09T00:32:42.1389101Z Nov 09 00:32:42  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> 2021-11-09T00:32:42.1400913Z Nov 09 00:32:42  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2021-11-09T00:32:42.1401588Z Nov 09 00:32:42  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> 2021-11-09T00:32:42.1402487Z Nov 09 00:32:42  at 
> org.apache.flink.testutils.junit.RetryRule$RetryOnExceptionStatement.evaluate(RetryRule.java:192)
> 2021-11-09T00:32:42.1403055Z Nov 09 00:32:42  at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> 2021-11-09T00:32:42.1403556Z Nov 09 00:32:42  at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
> 2021-11-09T00:32:42.1404008Z Nov 09 00:32:42  at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> 2021-11-09T00:32:42.1404650Z Nov 09 00:32:42  at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
> 2021-11-09T00:32:42.1405151Z Nov 09 00:32:42  at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
> 2021-11-09T00:32:42.1405632Z Nov 09 00:32:42  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
> 2021-11-09T00:32:42.1406166Z Nov 09 00:32:42  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
> 2021-11-09T00:32:42.1406670Z Nov 09 00:32:42  at 
> org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> 2021-11-09T00:32:42.1407125Z Nov 09 00:32:42  at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> 2021-11-09T00:32:42.1407599Z Nov 09 00:32:42  at 
> 

[jira] [Assigned] (FLINK-26029) Generalize the checkpoint protocol of OperatorCoordinator.

2022-02-14 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin reassigned FLINK-26029:


Assignee: Dong Lin

> Generalize the checkpoint protocol of OperatorCoordinator.
> --
>
> Key: FLINK-26029
> URL: https://issues.apache.org/jira/browse/FLINK-26029
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.14.3
>Reporter: Jiangjie Qin
>Assignee: Dong Lin
>Priority: Major
>  Labels: extensibility
> Fix For: 1.16.0
>
>
> Currently the JM opens all the event valves from the OperatorCoordinator to 
> the subtasks after the checkpoint barriers are sent to the Source subtasks. 
> While this works for the Source Operators, it unnecessarily limits general 
> usage of the OperatorCoordinator for other operators.
> To generalize the protocol, we can change the JM to open the event valve of 
> the subtasks that have finished the local checkpoint. So the protocol would 
> become following:
>  # Let the OC finish processing all the incoming OperatorEvents before the 
> snapshot.
>  # Wait until all the outgoing OperatorEvents before the snapshot are sent 
> and acked.
>  # Shut the event valve so no outgoing events can be sent to the subtasks.
>  # Send checkpoint barriers to the Source operators.
>  # Open the corresponding event valve of a subtask when the 
> AcknowledgeCheckpoint messages from that subtask is received. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-24439) Introduce CoordinatorStore

2022-02-08 Thread Jiangjie Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24439?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17489279#comment-17489279
 ] 

Jiangjie Qin commented on FLINK-24439:
--

[~pnowojski] This effectively introduces a cross-operator communication 
mechanism for control purpose. Can you articulate a bit on the intended usage 
of this {{{}CoordinatorStore{}}}? Will it be used as an ordinary shared 
hashmap? If so, will there be any convention of the keys to avoid conflicts 
between different operators?

> Introduce CoordinatorStore
> --
>
> Key: FLINK-24439
> URL: https://issues.apache.org/jira/browse/FLINK-24439
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Common
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> In order to allow {{SourceCoordinators}}s from different {{Sources}} (for 
> example two different Kafka sources, or Kafka and Kinesis) to align 
> watermarks, they have to be able to exchange information/aggregate watermarks 
> from those different Sources. To enable this, we need to provide some 
> {{CoordinatorStore}} concept, that would be a thread safe singleton.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-24607) SourceCoordinator may miss to close SplitEnumerator when failover frequently

2022-02-08 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24607?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated FLINK-24607:
-
Fix Version/s: 1.15.0
   1.13.6
   1.14.4

> SourceCoordinator may miss to close SplitEnumerator when failover frequently
> 
>
> Key: FLINK-24607
> URL: https://issues.apache.org/jira/browse/FLINK-24607
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.13.3
>Reporter: Jark Wu
>Assignee: Jiangjie Qin
>Priority: Critical
> Fix For: 1.15.0, 1.13.6, 1.14.4
>
> Attachments: jobmanager.log
>
>
> We are having a connection leak problem when using mysql-cdc [1] source. We 
> observed that many enumerators are not closed from the JM log.
> {code}
> ➜  test123 cat jobmanager.log | grep "SourceCoordinator \[\] - Restoring 
> SplitEnumerator" | wc -l
>  264
> ➜  test123 cat jobmanager.log | grep "SourceCoordinator \[\] - Starting split 
> enumerator" | wc -l
>  264
> ➜  test123 cat jobmanager.log | grep "MySqlSourceEnumerator \[\] - Starting 
> enumerator" | wc -l
>  263
> ➜  test123 cat jobmanager.log | grep "SourceCoordinator \[\] - Closing 
> SourceCoordinator" | wc -l
>  264
> ➜  test123 cat jobmanager.log | grep "MySqlSourceEnumerator \[\] - Closing 
> enumerator" | wc -l
>  195
> {code}
> We added "Closing enumerator" log in {{MySqlSourceEnumerator#close()}}, and 
> "Starting enumerator" in {{MySqlSourceEnumerator#start()}}. From the above 
> result you can see that SourceCoordinator is restored and closed 264 times, 
> split enumerator is started 264 but only closed 195 times. It seems that 
> {{SourceCoordinator}} misses to close enumerator when job failover 
> frequently. 
> I also went throught the code of {{SourceCoordinator}} and found some 
> suspicious point:
> The {{started}} flag and  {{enumerator}} is assigned in the main thread, 
> however {{SourceCoordinator#close()}} is executed async by 
> {{DeferrableCoordinator#closeAsync}}.  That means the close method will check 
> the {{started}} and {{enumerator}} variable async. Is there any concurrency 
> problem here which mean lead to dirty read and miss to close the 
> {{enumerator}}? 
> I'm still not sure, because it's hard to reproduce locally, and we can't 
> deploy a custom flink version to production env. 
> [1]: https://github.com/ververica/flink-cdc-connectors



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (FLINK-24607) SourceCoordinator may miss to close SplitEnumerator when failover frequently

2022-02-08 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24607?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin reassigned FLINK-24607:


Assignee: Jiangjie Qin

> SourceCoordinator may miss to close SplitEnumerator when failover frequently
> 
>
> Key: FLINK-24607
> URL: https://issues.apache.org/jira/browse/FLINK-24607
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.13.3
>Reporter: Jark Wu
>Assignee: Jiangjie Qin
>Priority: Critical
> Attachments: jobmanager.log
>
>
> We are having a connection leak problem when using mysql-cdc [1] source. We 
> observed that many enumerators are not closed from the JM log.
> {code}
> ➜  test123 cat jobmanager.log | grep "SourceCoordinator \[\] - Restoring 
> SplitEnumerator" | wc -l
>  264
> ➜  test123 cat jobmanager.log | grep "SourceCoordinator \[\] - Starting split 
> enumerator" | wc -l
>  264
> ➜  test123 cat jobmanager.log | grep "MySqlSourceEnumerator \[\] - Starting 
> enumerator" | wc -l
>  263
> ➜  test123 cat jobmanager.log | grep "SourceCoordinator \[\] - Closing 
> SourceCoordinator" | wc -l
>  264
> ➜  test123 cat jobmanager.log | grep "MySqlSourceEnumerator \[\] - Closing 
> enumerator" | wc -l
>  195
> {code}
> We added "Closing enumerator" log in {{MySqlSourceEnumerator#close()}}, and 
> "Starting enumerator" in {{MySqlSourceEnumerator#start()}}. From the above 
> result you can see that SourceCoordinator is restored and closed 264 times, 
> split enumerator is started 264 but only closed 195 times. It seems that 
> {{SourceCoordinator}} misses to close enumerator when job failover 
> frequently. 
> I also went throught the code of {{SourceCoordinator}} and found some 
> suspicious point:
> The {{started}} flag and  {{enumerator}} is assigned in the main thread, 
> however {{SourceCoordinator#close()}} is executed async by 
> {{DeferrableCoordinator#closeAsync}}.  That means the close method will check 
> the {{started}} and {{enumerator}} variable async. Is there any concurrency 
> problem here which mean lead to dirty read and miss to close the 
> {{enumerator}}? 
> I'm still not sure, because it's hard to reproduce locally, and we can't 
> deploy a custom flink version to production env. 
> [1]: https://github.com/ververica/flink-cdc-connectors



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-24607) SourceCoordinator may miss to close SplitEnumerator when failover frequently

2022-02-08 Thread Jiangjie Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24607?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17489273#comment-17489273
 ] 

Jiangjie Qin commented on FLINK-24607:
--

[~jark] [~dmvk] Sorry I did not notice the attached log file earlier. From the 
log, it looks that the split enumerators are sometimes not closed because the 
coordinator closing sequence timed out before it reaches the code block that 
closes the enumerator.

More specifically, either the main executor or worker executor is performing 
some time-consuming task which causes the executor shutdown to take more than 
60 seconds. After that closing sequence will be interrupted and the rest of the 
closing sequence are skipped.

What we can do here is to put the split enumerator closing sequence in the 
finally block in {{{}SourceCoordinator.close(){}}}. It ensures that 
{{enumerator.close()}} will be invoked. Also, we will need to make sure that 
the executors actually exits even if the split enumerator executes some tasks 
running indefinitely.

I'll submit a patch shortly.

> SourceCoordinator may miss to close SplitEnumerator when failover frequently
> 
>
> Key: FLINK-24607
> URL: https://issues.apache.org/jira/browse/FLINK-24607
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.13.3
>Reporter: Jark Wu
>Priority: Critical
> Attachments: jobmanager.log
>
>
> We are having a connection leak problem when using mysql-cdc [1] source. We 
> observed that many enumerators are not closed from the JM log.
> {code}
> ➜  test123 cat jobmanager.log | grep "SourceCoordinator \[\] - Restoring 
> SplitEnumerator" | wc -l
>  264
> ➜  test123 cat jobmanager.log | grep "SourceCoordinator \[\] - Starting split 
> enumerator" | wc -l
>  264
> ➜  test123 cat jobmanager.log | grep "MySqlSourceEnumerator \[\] - Starting 
> enumerator" | wc -l
>  263
> ➜  test123 cat jobmanager.log | grep "SourceCoordinator \[\] - Closing 
> SourceCoordinator" | wc -l
>  264
> ➜  test123 cat jobmanager.log | grep "MySqlSourceEnumerator \[\] - Closing 
> enumerator" | wc -l
>  195
> {code}
> We added "Closing enumerator" log in {{MySqlSourceEnumerator#close()}}, and 
> "Starting enumerator" in {{MySqlSourceEnumerator#start()}}. From the above 
> result you can see that SourceCoordinator is restored and closed 264 times, 
> split enumerator is started 264 but only closed 195 times. It seems that 
> {{SourceCoordinator}} misses to close enumerator when job failover 
> frequently. 
> I also went throught the code of {{SourceCoordinator}} and found some 
> suspicious point:
> The {{started}} flag and  {{enumerator}} is assigned in the main thread, 
> however {{SourceCoordinator#close()}} is executed async by 
> {{DeferrableCoordinator#closeAsync}}.  That means the close method will check 
> the {{started}} and {{enumerator}} variable async. Is there any concurrency 
> problem here which mean lead to dirty read and miss to close the 
> {{enumerator}}? 
> I'm still not sure, because it's hard to reproduce locally, and we can't 
> deploy a custom flink version to production env. 
> [1]: https://github.com/ververica/flink-cdc-connectors



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-26029) Generalize the checkpoint protocol of OperatorCoordinator.

2022-02-08 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated FLINK-26029:
-
Labels: extensibility  (was: )

> Generalize the checkpoint protocol of OperatorCoordinator.
> --
>
> Key: FLINK-26029
> URL: https://issues.apache.org/jira/browse/FLINK-26029
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.14.3
>Reporter: Jiangjie Qin
>Priority: Major
>  Labels: extensibility
> Fix For: 1.16.0
>
>
> Currently the JM opens all the event valves from the OperatorCoordinator to 
> the subtasks after the checkpoint barriers are sent to the Source subtasks. 
> While this works for the Source Operators, it unnecessarily limits general 
> usage of the OperatorCoordinator for other operators.
> To generalize the protocol, we can change the JM to open the event valve of 
> the subtasks that have finished the local checkpoint. So the protocol would 
> become following:
>  # Let the OC finish processing all the incoming OperatorEvents before the 
> snapshot.
>  # Wait until all the outgoing OperatorEvents before the snapshot are sent 
> and acked.
>  # Shut the event valve so no outgoing events can be sent to the subtasks.
>  # Send checkpoint barriers to the Source operators.
>  # Open the corresponding event valve of a subtask when the 
> AcknowledgeCheckpoint messages from that subtask is received. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-26029) Generalize the checkpoint protocol of OperatorCoordinator.

2022-02-08 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated FLINK-26029:
-
Fix Version/s: 1.16.0

> Generalize the checkpoint protocol of OperatorCoordinator.
> --
>
> Key: FLINK-26029
> URL: https://issues.apache.org/jira/browse/FLINK-26029
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.14.3
>Reporter: Jiangjie Qin
>Priority: Major
> Fix For: 1.16.0
>
>
> Currently the JM opens all the event valves from the OperatorCoordinator to 
> the subtasks after the checkpoint barriers are sent to the Source subtasks. 
> While this works for the Source Operators, it unnecessarily limits general 
> usage of the OperatorCoordinator for other operators.
> To generalize the protocol, we can change the JM to open the event valve of 
> the subtasks that have finished the local checkpoint. So the protocol would 
> become following:
>  # Let the OC finish processing all the incoming OperatorEvents before the 
> snapshot.
>  # Wait until all the outgoing OperatorEvents before the snapshot are sent 
> and acked.
>  # Shut the event valve so no outgoing events can be sent to the subtasks.
>  # Send checkpoint barriers to the Source operators.
>  # Open the corresponding event valve of a subtask when the 
> AcknowledgeCheckpoint messages from that subtask is received. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26029) Generalize the checkpoint protocol of OperatorCoordinator.

2022-02-08 Thread Jiangjie Qin (Jira)
Jiangjie Qin created FLINK-26029:


 Summary: Generalize the checkpoint protocol of OperatorCoordinator.
 Key: FLINK-26029
 URL: https://issues.apache.org/jira/browse/FLINK-26029
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing
Affects Versions: 1.14.3
Reporter: Jiangjie Qin


Currently the JM opens all the event valves from the OperatorCoordinator to the 
subtasks after the checkpoint barriers are sent to the Source subtasks. While 
this works for the Source Operators, it unnecessarily limits general usage of 
the OperatorCoordinator for other operators.

To generalize the protocol, we can change the JM to open the event valve of the 
subtasks that have finished the local checkpoint. So the protocol would become 
following:
 # Let the OC finish processing all the incoming OperatorEvents before the 
snapshot.
 # Wait until all the outgoing OperatorEvents before the snapshot are sent and 
acked.
 # Shut the event valve so no outgoing events can be sent to the subtasks.
 # Send checkpoint barriers to the Source operators.
 # Open the corresponding event valve of a subtask when the 
AcknowledgeCheckpoint messages from that subtask is received. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-24607) SourceCoordinator may miss to close SplitEnumerator when failover frequently

2022-01-12 Thread Jiangjie Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24607?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17475066#comment-17475066
 ] 

Jiangjie Qin commented on FLINK-24607:
--

[~dmvk] I am not sure about the exact cause of this issue. But I went through 
the SourceCoordinator protocol a couple of days earlier and there might be a 
threading bug. I will verify that first and see if that is related to this 
ticket.

> SourceCoordinator may miss to close SplitEnumerator when failover frequently
> 
>
> Key: FLINK-24607
> URL: https://issues.apache.org/jira/browse/FLINK-24607
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.13.3
>Reporter: Jark Wu
>Priority: Critical
> Attachments: jobmanager.log
>
>
> We are having a connection leak problem when using mysql-cdc [1] source. We 
> observed that many enumerators are not closed from the JM log.
> {code}
> ➜  test123 cat jobmanager.log | grep "SourceCoordinator \[\] - Restoring 
> SplitEnumerator" | wc -l
>  264
> ➜  test123 cat jobmanager.log | grep "SourceCoordinator \[\] - Starting split 
> enumerator" | wc -l
>  264
> ➜  test123 cat jobmanager.log | grep "MySqlSourceEnumerator \[\] - Starting 
> enumerator" | wc -l
>  263
> ➜  test123 cat jobmanager.log | grep "SourceCoordinator \[\] - Closing 
> SourceCoordinator" | wc -l
>  264
> ➜  test123 cat jobmanager.log | grep "MySqlSourceEnumerator \[\] - Closing 
> enumerator" | wc -l
>  195
> {code}
> We added "Closing enumerator" log in {{MySqlSourceEnumerator#close()}}, and 
> "Starting enumerator" in {{MySqlSourceEnumerator#start()}}. From the above 
> result you can see that SourceCoordinator is restored and closed 264 times, 
> split enumerator is started 264 but only closed 195 times. It seems that 
> {{SourceCoordinator}} misses to close enumerator when job failover 
> frequently. 
> I also went throught the code of {{SourceCoordinator}} and found some 
> suspicious point:
> The {{started}} flag and  {{enumerator}} is assigned in the main thread, 
> however {{SourceCoordinator#close()}} is executed async by 
> {{DeferrableCoordinator#closeAsync}}.  That means the close method will check 
> the {{started}} and {{enumerator}} variable async. Is there any concurrency 
> problem here which mean lead to dirty read and miss to close the 
> {{enumerator}}? 
> I'm still not sure, because it's hard to reproduce locally, and we can't 
> deploy a custom flink version to production env. 
> [1]: https://github.com/ververica/flink-cdc-connectors



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25152) FLIP-188: Introduce Built-in Dynamic Table Storage

2022-01-11 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated FLINK-25152:
-
Fix Version/s: table-store-0.1.0
   (was: 1.15.0)

> FLIP-188: Introduce Built-in Dynamic Table Storage
> --
>
> Key: FLINK-25152
> URL: https://issues.apache.org/jira/browse/FLINK-25152
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API, Table SQL / Ecosystem, Table Store
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
> Fix For: table-store-0.1.0
>
>
> introduce built-in storage support for dynamic table, a truly unified 
> changelog & table representation, from Flink SQL’s perspective. The storage 
> will improve the usability a lot.
> More detail see: 
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25152) FLIP-188: Introduce Built-in Dynamic Table Storage

2022-01-11 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated FLINK-25152:
-
Component/s: Table Store

> FLIP-188: Introduce Built-in Dynamic Table Storage
> --
>
> Key: FLINK-25152
> URL: https://issues.apache.org/jira/browse/FLINK-25152
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API, Table SQL / Ecosystem, Table Store
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
> Fix For: 1.15.0
>
>
> introduce built-in storage support for dynamic table, a truly unified 
> changelog & table representation, from Flink SQL’s perspective. The storage 
> will improve the usability a lot.
> More detail see: 
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (FLINK-10737) FlinkKafkaProducerITCase.testScaleDownBeforeFirstCheckpoint failed on Travis

2021-11-29 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-10737?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin reassigned FLINK-10737:


Assignee: Qingsheng Ren  (was: Jiangjie Qin)

> FlinkKafkaProducerITCase.testScaleDownBeforeFirstCheckpoint failed on Travis
> 
>
> Key: FLINK-10737
> URL: https://issues.apache.org/jira/browse/FLINK-10737
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka, Tests
>Affects Versions: 1.7.0, 1.8.0, 1.12.5, 1.14.1
>Reporter: Till Rohrmann
>Assignee: Qingsheng Ren
>Priority: Critical
>  Labels: test-stability
>
> The {{FlinkKafkaProducerITCase.testScaleDownBeforeFirstCheckpoint}} failed on 
> Travis:
> https://api.travis-ci.org/v3/job/448781612/log.txt



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-10737) FlinkKafkaProducerITCase.testScaleDownBeforeFirstCheckpoint failed on Travis

2021-11-29 Thread Jiangjie Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-10737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17450837#comment-17450837
 ] 

Jiangjie Qin commented on FLINK-10737:
--

[~trohrmann] I probably won't be able to look into the ticket. I have asked 
[~renqs] to help.

> FlinkKafkaProducerITCase.testScaleDownBeforeFirstCheckpoint failed on Travis
> 
>
> Key: FLINK-10737
> URL: https://issues.apache.org/jira/browse/FLINK-10737
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka, Tests
>Affects Versions: 1.7.0, 1.8.0, 1.12.5, 1.14.1
>Reporter: Till Rohrmann
>Assignee: Jiangjie Qin
>Priority: Critical
>  Labels: test-stability
>
> The {{FlinkKafkaProducerITCase.testScaleDownBeforeFirstCheckpoint}} failed on 
> Travis:
> https://api.travis-ci.org/v3/job/448781612/log.txt



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-24949) KafkaITCase.testBigRecordJob fails on azure

2021-11-21 Thread Jiangjie Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17447195#comment-17447195
 ] 

Jiangjie Qin commented on FLINK-24949:
--

[~gaoyunhaii] not sure if this is the same cause. [~renqs] can you take a look?

> KafkaITCase.testBigRecordJob fails on azure
> ---
>
> Key: FLINK-24949
> URL: https://issues.apache.org/jira/browse/FLINK-24949
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.14.1
>Reporter: Yun Gao
>Priority: Major
>  Labels: test-stability
> Fix For: 1.14.1
>
>
> {code:java}
> Nov 17 23:39:39 [ERROR] Tests run: 23, Failures: 0, Errors: 1, Skipped: 0, 
> Time elapsed: 222.57 s <<< FAILURE! - in 
> org.apache.flink.streaming.connectors.kafka.KafkaITCase
> Nov 17 23:39:39 [ERROR] testBigRecordJob  Time elapsed: 60.02 s  <<< ERROR!
> Nov 17 23:39:39 org.junit.runners.model.TestTimedOutException: test timed out 
> after 6 milliseconds
> Nov 17 23:39:39   at sun.misc.Unsafe.park(Native Method)
> Nov 17 23:39:39   at 
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> Nov 17 23:39:39   at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
> Nov 17 23:39:39   at 
> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
> Nov 17 23:39:39   at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
> Nov 17 23:39:39   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> Nov 17 23:39:39   at 
> org.apache.flink.test.util.TestUtils.tryExecute(TestUtils.java:58)
> Nov 17 23:39:39   at 
> org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.runBigRecordTestTopology(KafkaConsumerTestBase.java:1473)
> Nov 17 23:39:39   at 
> org.apache.flink.streaming.connectors.kafka.KafkaITCase.testBigRecordJob(KafkaITCase.java:119)
> Nov 17 23:39:39   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> Nov 17 23:39:39   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> Nov 17 23:39:39   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> Nov 17 23:39:39   at java.lang.reflect.Method.invoke(Method.java:498)
> Nov 17 23:39:39   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> Nov 17 23:39:39   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> Nov 17 23:39:39   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> Nov 17 23:39:39   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> Nov 17 23:39:39   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299)
> Nov 17 23:39:39   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293)
> Nov 17 23:39:39   at 
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> Nov 17 23:39:39   at java.lang.Thread.run(Thread.java:748)
>  {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=26679=logs=c5f0071e-1851-543e-9a45-9ac140befc32=15a22db7-8faa-5b34-3920-d33c9f0ca23c=7161
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-24376) Operator name in OperatorCoordinator should not use chained name

2021-10-11 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated FLINK-24376:
-
Fix Version/s: 1.14.1

> Operator name in OperatorCoordinator should not use chained name
> 
>
> Key: FLINK-24376
> URL: https://issues.apache.org/jira/browse/FLINK-24376
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.14.0, 1.12.5, 1.13.2, 1.14.1
>Reporter: Qingsheng Ren
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.1
>
>
> Currently the operator name passed to 
> {{CoordinatedOperatorFactory#getCoordinatorProvider}} is a chained operator 
> name (e.g. Source -> Map) instead of the name of coordinating operator, which 
> might be misleading. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-24376) Operator name in OperatorCoordinator should not use chained name

2021-10-11 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated FLINK-24376:
-
Affects Version/s: (was: 1.14.1)

> Operator name in OperatorCoordinator should not use chained name
> 
>
> Key: FLINK-24376
> URL: https://issues.apache.org/jira/browse/FLINK-24376
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.14.0, 1.12.5, 1.13.2
>Reporter: Qingsheng Ren
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.1
>
>
> Currently the operator name passed to 
> {{CoordinatedOperatorFactory#getCoordinatorProvider}} is a chained operator 
> name (e.g. Source -> Map) instead of the name of coordinating operator, which 
> might be misleading. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (FLINK-22915) FLIP-173: Support DAG of algorithms

2021-09-26 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin resolved FLINK-22915.
--
Resolution: Implemented

Merged to master: 5ff346ea1a508a00b89759492f09e7330e69baef

> FLIP-173: Support DAG of algorithms
> ---
>
> Key: FLINK-22915
> URL: https://issues.apache.org/jira/browse/FLINK-22915
> Project: Flink
>  Issue Type: Improvement
>  Components: Library / Machine Learning
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 0.1.0
>
>
> The FLIP design doc can be found at 
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=184615783.
> The existing Flink ML library allows users to compose an 
> Estimator/Transformer from a pipeline (i.e. linear sequence) of 
> Estimator/Transformer, and each Estimator/Transformer has one input and one 
> output.
> The following use-cases are not supported yet. And we would like to address 
> these use-cases with the changes proposed in this FLIP.
> 1) Express an Estimator/Transformer that has multiple inputs/outputs.
> For example, some graph embedding algorithms (e.g., MetaPath2Vec) need to 
> take two tables as inputs. These two tables represent nodes labels and edges 
> of the graph respectively. This logic can be expressed as an Estimator with 2 
> input tables.
> And some workflow may need to split 1 table into 2 tables, and use these 
> tables for training and validation respectively. This logic can be expressed 
> by a Transformer with 1 input table and 2 output tables.
> 2) Express a generic machine learning computation logic that does not have 
> the "transformation" semantic.
> We believe most machine learning engineers associate the name "Transformer" 
> with the "transformation" semantic, where the a record in the output 
> typically corresponds to one record in the input. Thus it is 
> counter-intuitive to use Transformer to encode aggregation logic, where a 
> record in the output corresponds to an arbitrary number of records in the 
> input.
> Therefore we need to have a class with a name different from "Transformer" to 
> encode generic multi-input multi-output computation logic. 
> 3) Online learning where a long-running Model instance needs to be 
> continuously updated by the latest model data generated by another 
> long-running Estimator instance.
> In this scenario, we need to allow the Estimator to be run on a different 
> machine than the Model, so that the Estimator could consume sufficient 
> computation resource in a cluster while the Model could be deployed on edge 
> devices.
> 4) Provide APIs to allow Estimator/Model to be efficiently saved/loaded even 
> if state (e.g. model data) of Estimator/Model is more than 10s of GBs.
> The existing PipelineStage::toJson basically requires developer of 
> Estimator/Model to serialize all model data into an in-memory string, which 
> could be very inefficient (or practically impossible) if the model data is 
> very large (e.g 10s of GBs).
> In addition to addressing the above use-cases, this FLIP also proposes a few 
> more changes to simplify the class hierarchy and improve API usability. The 
> existing Flink ML library has the following usability issues:
> 5) fit/transform API requires users to explicitly provide the 
> TableEnvironment, where the TableEnvironment could be retrieved from the 
> Table instance given to the fit/transform.
> 6) A Pipeline is currently both a Transformer and an Estimator. The 
> experience of using Pipeline is inconsistent from the experience of using 
> Estimator (with the needFit API).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22915) FLIP-173: Support DAG of algorithms

2021-09-26 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated FLINK-22915:
-
Fix Version/s: 0.1.0

> FLIP-173: Support DAG of algorithms
> ---
>
> Key: FLINK-22915
> URL: https://issues.apache.org/jira/browse/FLINK-22915
> Project: Flink
>  Issue Type: Improvement
>  Components: Library / Machine Learning
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 0.1.0
>
>
> The FLIP design doc can be found at 
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=184615783.
> The existing Flink ML library allows users to compose an 
> Estimator/Transformer from a pipeline (i.e. linear sequence) of 
> Estimator/Transformer, and each Estimator/Transformer has one input and one 
> output.
> The following use-cases are not supported yet. And we would like to address 
> these use-cases with the changes proposed in this FLIP.
> 1) Express an Estimator/Transformer that has multiple inputs/outputs.
> For example, some graph embedding algorithms (e.g., MetaPath2Vec) need to 
> take two tables as inputs. These two tables represent nodes labels and edges 
> of the graph respectively. This logic can be expressed as an Estimator with 2 
> input tables.
> And some workflow may need to split 1 table into 2 tables, and use these 
> tables for training and validation respectively. This logic can be expressed 
> by a Transformer with 1 input table and 2 output tables.
> 2) Express a generic machine learning computation logic that does not have 
> the "transformation" semantic.
> We believe most machine learning engineers associate the name "Transformer" 
> with the "transformation" semantic, where the a record in the output 
> typically corresponds to one record in the input. Thus it is 
> counter-intuitive to use Transformer to encode aggregation logic, where a 
> record in the output corresponds to an arbitrary number of records in the 
> input.
> Therefore we need to have a class with a name different from "Transformer" to 
> encode generic multi-input multi-output computation logic. 
> 3) Online learning where a long-running Model instance needs to be 
> continuously updated by the latest model data generated by another 
> long-running Estimator instance.
> In this scenario, we need to allow the Estimator to be run on a different 
> machine than the Model, so that the Estimator could consume sufficient 
> computation resource in a cluster while the Model could be deployed on edge 
> devices.
> 4) Provide APIs to allow Estimator/Model to be efficiently saved/loaded even 
> if state (e.g. model data) of Estimator/Model is more than 10s of GBs.
> The existing PipelineStage::toJson basically requires developer of 
> Estimator/Model to serialize all model data into an in-memory string, which 
> could be very inefficient (or practically impossible) if the model data is 
> very large (e.g 10s of GBs).
> In addition to addressing the above use-cases, this FLIP also proposes a few 
> more changes to simplify the class hierarchy and improve API usability. The 
> existing Flink ML library has the following usability issues:
> 5) fit/transform API requires users to explicitly provide the 
> TableEnvironment, where the TableEnvironment could be retrieved from the 
> Table instance given to the fit/transform.
> 6) A Pipeline is currently both a Transformer and an Estimator. The 
> experience of using Pipeline is inconsistent from the experience of using 
> Estimator (with the needFit API).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-22915) FLIP-173: Support DAG of algorithms

2021-09-26 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin reassigned FLINK-22915:


Assignee: Dong Lin

> FLIP-173: Support DAG of algorithms
> ---
>
> Key: FLINK-22915
> URL: https://issues.apache.org/jira/browse/FLINK-22915
> Project: Flink
>  Issue Type: Improvement
>  Components: Library / Machine Learning
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
>  Labels: pull-request-available
>
> The FLIP design doc can be found at 
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=184615783.
> The existing Flink ML library allows users to compose an 
> Estimator/Transformer from a pipeline (i.e. linear sequence) of 
> Estimator/Transformer, and each Estimator/Transformer has one input and one 
> output.
> The following use-cases are not supported yet. And we would like to address 
> these use-cases with the changes proposed in this FLIP.
> 1) Express an Estimator/Transformer that has multiple inputs/outputs.
> For example, some graph embedding algorithms (e.g., MetaPath2Vec) need to 
> take two tables as inputs. These two tables represent nodes labels and edges 
> of the graph respectively. This logic can be expressed as an Estimator with 2 
> input tables.
> And some workflow may need to split 1 table into 2 tables, and use these 
> tables for training and validation respectively. This logic can be expressed 
> by a Transformer with 1 input table and 2 output tables.
> 2) Express a generic machine learning computation logic that does not have 
> the "transformation" semantic.
> We believe most machine learning engineers associate the name "Transformer" 
> with the "transformation" semantic, where the a record in the output 
> typically corresponds to one record in the input. Thus it is 
> counter-intuitive to use Transformer to encode aggregation logic, where a 
> record in the output corresponds to an arbitrary number of records in the 
> input.
> Therefore we need to have a class with a name different from "Transformer" to 
> encode generic multi-input multi-output computation logic. 
> 3) Online learning where a long-running Model instance needs to be 
> continuously updated by the latest model data generated by another 
> long-running Estimator instance.
> In this scenario, we need to allow the Estimator to be run on a different 
> machine than the Model, so that the Estimator could consume sufficient 
> computation resource in a cluster while the Model could be deployed on edge 
> devices.
> 4) Provide APIs to allow Estimator/Model to be efficiently saved/loaded even 
> if state (e.g. model data) of Estimator/Model is more than 10s of GBs.
> The existing PipelineStage::toJson basically requires developer of 
> Estimator/Model to serialize all model data into an in-memory string, which 
> could be very inefficient (or practically impossible) if the model data is 
> very large (e.g 10s of GBs).
> In addition to addressing the above use-cases, this FLIP also proposes a few 
> more changes to simplify the class hierarchy and improve API usability. The 
> existing Flink ML library has the following usability issues:
> 5) fit/transform API requires users to explicitly provide the 
> TableEnvironment, where the TableEnvironment could be retrieved from the 
> Table instance given to the fit/transform.
> 6) A Pipeline is currently both a Transformer and an Estimator. The 
> experience of using Pipeline is inconsistent from the experience of using 
> Estimator (with the needFit API).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (FLINK-24277) Offset commit should be disabled if consumer group ID is not specified in KafkaSource

2021-09-18 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin resolved FLINK-24277.
--
Fix Version/s: 1.13.3
   1.12.6
 Assignee: Qingsheng Ren
   Resolution: Fixed

Fixed in the following branches:

Master: 2da73edba95685537040305f30ee9d6dfd8d6c02
release-1.14: cc19997d6124e0b4f8c905601a3c98b328014f1d
release-1.13: c4c91ec579d6aaa377e1c1979ac3cb09b345c531
release-1.12: 944138eef08b8d1bb1573770865209a89e1a58bc

> Offset commit should be disabled if consumer group ID is not specified in 
> KafkaSource
> -
>
> Key: FLINK-24277
> URL: https://issues.apache.org/jira/browse/FLINK-24277
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.14.0
>Reporter: Qingsheng Ren
>Assignee: Qingsheng Ren
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.14.0, 1.12.6, 1.13.3
>
>
> FLINK-24051 made "group.id" an optional configuration in KafkaSource. 
> However, KafkaSource will generate a random group id if user doesn't specify 
> one, and this random ID is inconsistent after failover, and not even human 
> readable.
> A solution will be adding a configuration for offset commit on checkpoint, 
> make it as true by default, and disable offset commit if group id is not 
> specified. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (FLINK-24059) SourceReaderTestBase should allow NUM_SPLITS to be overridden in implementation

2021-09-15 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24059?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin resolved FLINK-24059.
--
Resolution: Fixed

Merged to master: d4c483fadd3df32045fbb2ee117d0a6eeab9276e

Cherry-picked to release-1.14: 8d148a8b7832fcefefa4818de8e700562f0ffd26

> SourceReaderTestBase should allow NUM_SPLITS to be overridden in 
> implementation
> ---
>
> Key: FLINK-24059
> URL: https://issues.apache.org/jira/browse/FLINK-24059
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.13.2
>Reporter: Brian Zhou
>Assignee: Brian Zhou
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> Pravega Flink connector is trying to implement the FLIP-27 sources and trying 
> to map the Pravega reader into the split. This leads to a one-to-one mapping 
> for source reader and splits. For unit tests, Flink has offered the 
> {{SourceReaderTestBase}} class to test more easily, but it has a {{final}} 
> constraint in the NUM_SPLITS constant which the value is 10, which makes us 
> hard to integrate.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (FLINK-23773) KafkaPartitionSplitReader should remove empty splits from fetcher

2021-09-13 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23773?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin resolved FLINK-23773.
--
Resolution: Fixed

PR merged to master: fe17ca6042c570ce603bf4308775f61db1d515c9
cherry-picked to release-1.14: b26f7e7f5a0f1accda991a9304afa49369f5c553
cherry-picked to release-1.13: 763ac52092ba70dfef989d18b711400b437e6e09

 

> KafkaPartitionSplitReader should remove empty splits from fetcher
> -
>
> Key: FLINK-23773
> URL: https://issues.apache.org/jira/browse/FLINK-23773
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.14.0, 1.13.2
>Reporter: Qingsheng Ren
>Assignee: Qingsheng Ren
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0, 1.13.3
>
>
> Currently if a {{KafkaPartitionSplit}} is empty (startingOffset >= 
> stoppingOffset), split reader only unsubscribes it from consumer, but doesn't 
> remove it from SplitFetcher. This will lead to consumer complaining some 
> partitions are not subscribed while fetching.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (FLINK-23971) PulsarSourceITCase.testIdleReader failed on azure

2021-09-01 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23971?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin resolved FLINK-23971.
--
Resolution: Fixed

PR merged to master: ad052cc056c7d6e63d8356dbd22d6a98b54743c3

Cherry-picked to release-1.14: 267b863683b23b8b3df29bee55ac58a25ca1fcd0

> PulsarSourceITCase.testIdleReader failed on azure
> -
>
> Key: FLINK-23971
> URL: https://issues.apache.org/jira/browse/FLINK-23971
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: 1.14.0
>Reporter: Roman Khachatryan
>Assignee: Hang Ruan
>Priority: Blocker
>  Labels: pull-request-available, test-stability
> Fix For: 1.14.0
>
> Attachments: error.log
>
>
> {code:java}
> [INFO] Running org.apache.flink.connector.pulsar.source.PulsarSourceITCase
> [ERROR] Tests run: 8, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 
> 353.527 s <<< FAILURE! - in 
> org.apache.flink.connector.pulsar.source.PulsarSourceITCase
> [ERROR] testIdleReader{TestEnvironment, ExternalContext}[2]  Time elapsed: 
> 4.549 s  <<< FAILURE!
> java.lang.AssertionError:
> Expected: Records consumed by Flink should be identical to test data and 
> preserve the order in multiple splits
>  but: Unexpected record 'tj7MpFRWX95GzBpSF3CCjxKSal6bRhR0aF'
>at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:8)
>at 
> org.apache.flink.connectors.test.common.testsuites.SourceTestSuiteBase.testIdleReader(SourceTestSuiteBase.java:193)
> {code}
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=22819=logs=fc5181b0-e452-5c8f-68de-1097947f6483=995c650b-6573-581c-9ce6-7ad4cc038461=24448]
> This is the same error as in FLINK-23828 (kafka).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-24059) SourceReaderTestBase should allow NUM_SPLITS to be overridden in implementation

2021-08-30 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24059?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin reassigned FLINK-24059:


Assignee: Brian Zhou

> SourceReaderTestBase should allow NUM_SPLITS to be overridden in 
> implementation
> ---
>
> Key: FLINK-24059
> URL: https://issues.apache.org/jira/browse/FLINK-24059
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.13.2
>Reporter: Brian Zhou
>Assignee: Brian Zhou
>Priority: Minor
> Fix For: 1.14.0
>
>
> Pravega Flink connector is trying to implement the FLIP-27 sources and trying 
> to map the Pravega reader into the split. This leads to a one-to-one mapping 
> for source reader and splits. For unit tests, Flink has offered the 
> {{SourceReaderTestBase}} class to test more easily, but it has a {{final}} 
> constraint in the NUM_SPLITS constant which the value is 10, which makes us 
> hard to integrate.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (FLINK-23686) KafkaSource metric "commitsSucceeded" should count per-commit instead of per-partition

2021-08-30 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin resolved FLINK-23686.
--
Resolution: Fixed

Merged to master / release-1.14: 62931a1665e6a6976d088ed49375f9fdf00229d9

Cherry picked to 1.13: 5ce61a31ff8a184ce3f8457471ffc6f5f4439b5d

> KafkaSource metric "commitsSucceeded" should count per-commit instead of 
> per-partition
> --
>
> Key: FLINK-23686
> URL: https://issues.apache.org/jira/browse/FLINK-23686
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.14.0, 1.13.2
>Reporter: Qingsheng Ren
>Assignee: Qingsheng Ren
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0, 1.13.3
>
>
> Currently if a successful offset commit includes multiple topic partition 
> (let's say 4), the counter will increase by 4 instead of 1



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (FLINK-22702) KafkaSourceITCase.testRedundantParallelism failed

2021-08-26 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin resolved FLINK-22702.
--
Resolution: Fixed

Merged to master. 
83b9ee8a3afd3e3c5512b4a495f97c01c1be11c2

> KafkaSourceITCase.testRedundantParallelism failed
> -
>
> Key: FLINK-22702
> URL: https://issues.apache.org/jira/browse/FLINK-22702
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.14.0, 1.12.3
>Reporter: Guowei Ma
>Assignee: Qingsheng Ren
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.14.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=18107=logs=1fc6e7bf-633c-5081-c32a-9dea24b05730=80a658d1-f7f6-5d93-2758-53ac19fd5b19=6847
> {code:java}
> Caused by: java.lang.RuntimeException: One or more fetchers have encountered 
> exception
>   at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:199)
>   at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:154)
>   at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:116)
>   at 
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:275)
>   at 
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:67)
>   at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:398)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:619)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:583)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received 
> unexpected exception while polling the records
>   at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:146)
>   at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:101)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   ... 1 more
> Caused by: java.lang.IllegalStateException: Consumer is not subscribed to any 
> topics or assigned any partitions
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1223)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
>   at 
> org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.fetch(KafkaPartitionSplitReader.java:97)
>   at 
> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:56)
>   at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:138)
>   ... 6 more
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22766) Report metrics of KafkaConsumer in Kafka new source

2021-07-12 Thread Jiangjie Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22766?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17379537#comment-17379537
 ] 

Jiangjie Qin commented on FLINK-22766:
--

Merged to release-1.13: 2c455f324b9ec7ef053253cf4904413b1e5f7a98

> Report metrics of KafkaConsumer in Kafka new source
> ---
>
> Key: FLINK-22766
> URL: https://issues.apache.org/jira/browse/FLINK-22766
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.13.0
>Reporter: Qingsheng Ren
>Assignee: Qingsheng Ren
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0, 1.13.2
>
>
> Currently Kafka new source doesn't register metrics of KafkaConsumer in 
> KafkaPartitionSplitReader. These metrics should be added for debugging and 
> monitoring purpose. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16634) The PartitionDiscoverer in FlinkKafkaConsumer should not use the user provided client.id.

2021-07-07 Thread Jiangjie Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17377082#comment-17377082
 ] 

Jiangjie Qin commented on FLINK-16634:
--

[~liufangliang] Done.

> The PartitionDiscoverer in FlinkKafkaConsumer should not use the user 
> provided client.id.
> -
>
> Key: FLINK-16634
> URL: https://issues.apache.org/jira/browse/FLINK-16634
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.10.0
>Reporter: Jiangjie Qin
>Assignee: Fangliang Liu
>Priority: Major
>  Labels: auto-unassigned, stale-major
>
> The {{PartitionDiscoverer}} creates a {{KafkaConsumer}} using the client.id 
> from the user provided properties. This may cause the MBean to collide with 
> the fetching {{KafkaConsumer}}. The {{PartitionDiscoverer}} should use a 
> unique client.id instead, such as "PartitionDiscoverer-RANDOM_LONG"



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-16634) The PartitionDiscoverer in FlinkKafkaConsumer should not use the user provided client.id.

2021-07-07 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin reassigned FLINK-16634:


Assignee: Fangliang Liu

> The PartitionDiscoverer in FlinkKafkaConsumer should not use the user 
> provided client.id.
> -
>
> Key: FLINK-16634
> URL: https://issues.apache.org/jira/browse/FLINK-16634
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.10.0
>Reporter: Jiangjie Qin
>Assignee: Fangliang Liu
>Priority: Major
>  Labels: auto-unassigned, stale-major
>
> The {{PartitionDiscoverer}} creates a {{KafkaConsumer}} using the client.id 
> from the user provided properties. This may cause the MBean to collide with 
> the fetching {{KafkaConsumer}}. The {{PartitionDiscoverer}} should use a 
> unique client.id instead, such as "PartitionDiscoverer-RANDOM_LONG"



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (FLINK-22766) Report metrics of KafkaConsumer in Kafka new source

2021-07-05 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22766?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin resolved FLINK-22766.
--
Resolution: Fixed

Merged to master: b094a932845db5539fc07b032d49d0bcefd15df2

> Report metrics of KafkaConsumer in Kafka new source
> ---
>
> Key: FLINK-22766
> URL: https://issues.apache.org/jira/browse/FLINK-22766
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.13.0
>Reporter: Qingsheng Ren
>Assignee: Qingsheng Ren
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0, 1.13.2
>
>
> Currently Kafka new source doesn't register metrics of KafkaConsumer in 
> KafkaPartitionSplitReader. These metrics should be added for debugging and 
> monitoring purpose. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-22766) Report metrics of KafkaConsumer in Kafka new source

2021-07-05 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22766?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin reassigned FLINK-22766:


Assignee: Qingsheng Ren

> Report metrics of KafkaConsumer in Kafka new source
> ---
>
> Key: FLINK-22766
> URL: https://issues.apache.org/jira/browse/FLINK-22766
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.13.0
>Reporter: Qingsheng Ren
>Assignee: Qingsheng Ren
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0, 1.13.2
>
>
> Currently Kafka new source doesn't register metrics of KafkaConsumer in 
> KafkaPartitionSplitReader. These metrics should be added for debugging and 
> monitoring purpose. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (FLINK-22147) Refactor Partition Discovery Logic in KafkaSourceEnumerator

2021-06-24 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin resolved FLINK-22147.
--
Fix Version/s: 1.14.0
   Resolution: Implemented

Merged to master. 
1418a1ddd025adb3b502b8d7a89d0f338aa40c29

> Refactor Partition Discovery Logic in KafkaSourceEnumerator
> ---
>
> Key: FLINK-22147
> URL: https://issues.apache.org/jira/browse/FLINK-22147
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.13.0
>Reporter: Qingsheng Ren
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> Currently the logic of partition discovery is: the worker thread checks if 
> there's new partitions and initialize new splits if so, then coordinator 
> thread marks these splits as pending and try to make assignments.
> Under current design, the worker thread needs to keep an internal data 
> structure tracking already discovered partitions, which is duplicated with 
> pending splits + assigned partitions tracked by coordinator thread. Usually 
> this kind of double-bookkeeping is fragile. 
> Another issue is that the worker thread always fetches descriptions of ALL 
> topics at partition discovery, which will comes to a problem working with a 
> giant Kafka clusters with millions of topics/partitions. 
> In order to fix issues above, a refactor is needed for the partition 
> discovery logic in Kafka enumerator. Basically the logic can be changed to:
>  # The worker thread fetches descriptions of subscribed topics/partitions, 
> then hands over to coordinator thread
>  # The coordinator thread filters out already discovered partitions (pending 
> + assigned partitions), then invokes worker thread with {{callAsync}} to 
> fetch offsets for new partitions
>  #  The worker thread fetches offsets and creates splits for new partitions, 
> then hands over new splits to coordinator thread
>  # The coordinator thread marks these splits as pending and try to make 
> assignment. 
> Discussion of this issue can be found in 
> [https://github.com/apache/flink/pull/15461] .



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-22147) Refactor Partition Discovery Logic in KafkaSourceEnumerator

2021-06-24 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin reassigned FLINK-22147:


Assignee: Qingsheng Ren

> Refactor Partition Discovery Logic in KafkaSourceEnumerator
> ---
>
> Key: FLINK-22147
> URL: https://issues.apache.org/jira/browse/FLINK-22147
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.13.0
>Reporter: Qingsheng Ren
>Assignee: Qingsheng Ren
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> Currently the logic of partition discovery is: the worker thread checks if 
> there's new partitions and initialize new splits if so, then coordinator 
> thread marks these splits as pending and try to make assignments.
> Under current design, the worker thread needs to keep an internal data 
> structure tracking already discovered partitions, which is duplicated with 
> pending splits + assigned partitions tracked by coordinator thread. Usually 
> this kind of double-bookkeeping is fragile. 
> Another issue is that the worker thread always fetches descriptions of ALL 
> topics at partition discovery, which will comes to a problem working with a 
> giant Kafka clusters with millions of topics/partitions. 
> In order to fix issues above, a refactor is needed for the partition 
> discovery logic in Kafka enumerator. Basically the logic can be changed to:
>  # The worker thread fetches descriptions of subscribed topics/partitions, 
> then hands over to coordinator thread
>  # The coordinator thread filters out already discovered partitions (pending 
> + assigned partitions), then invokes worker thread with {{callAsync}} to 
> fetch offsets for new partitions
>  #  The worker thread fetches offsets and creates splits for new partitions, 
> then hands over new splits to coordinator thread
>  # The coordinator thread marks these splits as pending and try to make 
> assignment. 
> Discussion of this issue can be found in 
> [https://github.com/apache/flink/pull/15461] .



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-22722) Add Documentation for Kafka New Source

2021-05-31 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22722?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin reassigned FLINK-22722:


Assignee: Qingsheng Ren

> Add Documentation for Kafka New Source
> --
>
> Key: FLINK-22722
> URL: https://issues.apache.org/jira/browse/FLINK-22722
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Qingsheng Ren
>Assignee: Qingsheng Ren
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> Documentation describing the usage of Kafka FLIP-27 new source is required in 
> Flink documentations.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (FLINK-22722) Add Documentation for Kafka New Source

2021-05-31 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22722?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin resolved FLINK-22722.
--
Resolution: Fixed

Merged to master: 

b582991b8b2b8dadb89e71d5002c4a9cc2055e34

> Add Documentation for Kafka New Source
> --
>
> Key: FLINK-22722
> URL: https://issues.apache.org/jira/browse/FLINK-22722
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Qingsheng Ren
>Assignee: Qingsheng Ren
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
>
> Documentation describing the usage of Kafka FLIP-27 new source is required in 
> Flink documentations.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22133) SplitEmumerator does not provide checkpoint id in snapshot

2021-05-07 Thread Jiangjie Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17340604#comment-17340604
 ] 

Jiangjie Qin commented on FLINK-22133:
--

Thanks for the explanation, Thomas. So here are the pros and cons of 
backporting if I understand correctly:
 * Pros: those who develop sources on top of 1.12.4 would not experience API 
change.
 * Cons: We break the version contract. Some other users who have already 
implemented the source may get a surprise when upgrade to 1.12.4. (These users 
will still need to pick the new API if they go to 1.13+. It is just that such 
API changes in 1.13+ do not break our API contract with the users.)

May I first confirm if "PublicEvolving" APIs are allowed to change between 
minor versions? If it is allowed, then I think backporting totally makes sense. 
Otherwise, personally I feel keeping the contract seems more beneficial because 
we don't really know who is going to develop on top of 1.12.4 yet but there are 
users already implemented the Source in 1.12.3-. So breaking the contract 
potentially brings benefit to someone but will surely introduce surprise to the 
others.

If we do want to backport the feature to unblock some users who really need the 
checkpoint ID, would it be better to do what Stephan suggrested for 1.12?
{quote}... we add a new default method {{snapshotState(long checkpointId)}} 
which calls the method {{snapshotState()}}. The default method is the one that 
the system calls. So everything keeps working as currently, but users can 
override the default method (although they still have to override the other 
method as well with an empty body, to make the code compile).
{quote}

> SplitEmumerator does not provide checkpoint id in snapshot
> --
>
> Key: FLINK-22133
> URL: https://issues.apache.org/jira/browse/FLINK-22133
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.12.0
>Reporter: Brian Zhou
>Assignee: Stephan Ewen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.13.0
>
>
> In ExternallyInducedSource API, the checkpoint trigger exposes the checkpoint 
> Id for the external client to identify the checkpoint. However, in the 
> FLIP-27 source, the SplitEmumerator::snapshot() is a no-arg method. The 
> connector cannot track the checkpoint ID from Flink



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22133) SplitEmumerator does not provide checkpoint id in snapshot

2021-05-06 Thread Jiangjie Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17340493#comment-17340493
 ] 

Jiangjie Qin commented on FLINK-22133:
--

[~thw] I think we only plan to backport some backwards incompatible changes to 
the Kafka source in 1.12.x, which is otherwise not usable. We did not introduce 
backwards incompatible changes to the general Source API in 1.12.x.

I am wondering if this patch is generally useful enough to justify the 
backport.  So far it seems the checkpoint ID is only used by Pravega at this 
point. I know some users have already developed a few sources on top of release 
1.12. Backporting this change would break them immediately when they upgrade to 
1.12.4. On the other hand, given that SplitEnumerator API is PublicEvolving, 
users would probably be OK with this minor API change when they upgrade from 
1.12 to 1.13+.

What do you think?

> SplitEmumerator does not provide checkpoint id in snapshot
> --
>
> Key: FLINK-22133
> URL: https://issues.apache.org/jira/browse/FLINK-22133
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.12.0
>Reporter: Brian Zhou
>Assignee: Stephan Ewen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.13.0
>
>
> In ExternallyInducedSource API, the checkpoint trigger exposes the checkpoint 
> Id for the external client to identify the checkpoint. However, in the 
> FLIP-27 source, the SplitEmumerator::snapshot() is a no-arg method. The 
> connector cannot track the checkpoint ID from Flink



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20114) Test Kafka Source based on the new Source API 

2021-04-29 Thread Jiangjie Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17335209#comment-17335209
 ] 

Jiangjie Qin commented on FLINK-20114:
--

[~trohrmann] Sorry for the confusion. The ticket was left open because we were 
planning to backport the PR to release 1.12. But there are API changes so I 
started a discussion thread in the mailing list. The ticket can be closed once 
we reach consensus on whether it should be backported or not. I'll follow up on 
the mailing list thread.

> Test Kafka Source based on the new Source API 
> --
>
> Key: FLINK-20114
> URL: https://issues.apache.org/jira/browse/FLINK-20114
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka
>Affects Versions: 1.12.0
>Reporter: Robert Metzger
>Priority: Critical
>  Labels: pull-request-available, release-testing
> Fix For: 1.13.0, 1.12.4
>
> Attachments: Screenshot 2020-11-24 at 15.14.35.png, first-run.tgz, 
> second-run.tgz
>
>
> Feature introduced in https://issues.apache.org/jira/browse/FLINK-18323
>  
> [General Information about the Flink 1.12 release 
> testing|https://cwiki.apache.org/confluence/display/FLINK/1.12+Release+-+Community+Testing]
> When testing a feature, consider the following aspects:
> - Is the documentation easy to understand
> - Are the error messages, log messages, APIs etc. easy to understand
> - Is the feature working as expected under normal conditions
> - Is the feature working / failing as expected with invalid input, induced 
> errors etc.
> If you find a problem during testing, please file a ticket 
> (Priority=Critical; Fix Version = 1.12.0), and link it in this testing ticket.
> During the testing, and once you are finished, please write a short summary 
> of all things you have tested.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-11681) Add an AbstractMetric to combine the metric definition and metric management.

2021-04-28 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-11681?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin closed FLINK-11681.

Resolution: Abandoned

After some discussion, we have not reached consensus on whether this is 
necessary to do. Close the ticket at this point. We can revisit the metric 
abstraction if we found some issue.

> Add an AbstractMetric to combine the metric definition and metric management.
> -
>
> Key: FLINK-11681
> URL: https://issues.apache.org/jira/browse/FLINK-11681
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Metrics
>Reporter: Jiangjie Qin
>Priority: Major
>  Labels: auto-unassigned, pull-request-available
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> Currently the metric definitions are spread over many different components. 
> It would be useful to have a class that combines the metric definition and 
> management. This ticket is to create such an abstraction.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-10806) Support multiple consuming offsets when discovering a new topic

2021-04-28 Thread Jiangjie Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-10806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17335079#comment-17335079
 ] 

Jiangjie Qin commented on FLINK-10806:
--

After FLIP-27, the DataStream source allows users to specify arbitrary offsets 
to start reading from with a custom {{OffsetsInitializer}}. We may need to 
expose that option in the Table Source as well. Ping [~jark]

> Support multiple consuming offsets when discovering a new topic
> ---
>
> Key: FLINK-10806
> URL: https://issues.apache.org/jira/browse/FLINK-10806
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.6.2, 1.8.1
>Reporter: Jiayi Liao
>Priority: Major
>  Labels: auto-unassigned
>
> In KafkaConsumerBase, we discover the TopicPartitions and compare them with 
> the restoredState. It's reasonable when a topic's partitions scaled. However, 
> if we add a new topic which has too much data and restore the Flink program, 
> the data of the new topic will be consumed from the start, which may not be 
> what we want.  I think this should be an option for developers.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-16634) The PartitionDiscoverer in FlinkKafkaConsumer should not use the user provided client.id.

2021-04-28 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin reassigned FLINK-16634:


Assignee: Qingsheng Ren

> The PartitionDiscoverer in FlinkKafkaConsumer should not use the user 
> provided client.id.
> -
>
> Key: FLINK-16634
> URL: https://issues.apache.org/jira/browse/FLINK-16634
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.10.0
>Reporter: Jiangjie Qin
>Assignee: Qingsheng Ren
>Priority: Major
>  Labels: auto-unassigned
>
> The {{PartitionDiscoverer}} creates a {{KafkaConsumer}} using the client.id 
> from the user provided properties. This may cause the MBean to collide with 
> the fetching {{KafkaConsumer}}. The {{PartitionDiscoverer}} should use a 
> unique client.id instead, such as "PartitionDiscoverer-RANDOM_LONG"



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-20108) SingleThreadFetcherManager may add splits to a shutting down SplitFetcher

2021-04-28 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20108?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin closed FLINK-20108.

Fix Version/s: (was: 1.11.4)
   1.11.3
   Resolution: Fixed

FLINK-18128 has been backported to 1.11.3.

e72e48533902fe6a7271310736584e77b64d05b8

> SingleThreadFetcherManager may add splits to a shutting down SplitFetcher
> -
>
> Key: FLINK-20108
> URL: https://issues.apache.org/jira/browse/FLINK-20108
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.11.2
>Reporter: Jiangjie Qin
>Priority: Major
>  Labels: auto-unassigned
> Fix For: 1.11.3
>
>
> Currently the split fetchers are only removed from the 
> {{SplitFetcherManager.fetchers}} when the thread exit. The may cause problem 
> because when {{SplitFetcherManager.addSplits()}} is called, it may see a 
> shutting down split fetcher and adds splits to it. These splits will then 
> just be lost.
> This issue is actually already fixed in FLINK-18128. The fix needs to 
> cherry-picked to 1.11.3



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11654) Multiple transactional KafkaProducers writing to same cluster have clashing transaction IDs

2021-04-24 Thread Jiangjie Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17331360#comment-17331360
 ] 

Jiangjie Qin commented on FLINK-11654:
--

[~1wc] Technically speaking we still need a FLIP for this because it is a user 
sensible change. This FLIP is likely to be a quick one, though. We can create 
another Jira ticket and link that to this one.

> Multiple transactional KafkaProducers writing to same cluster have clashing 
> transaction IDs
> ---
>
> Key: FLINK-11654
> URL: https://issues.apache.org/jira/browse/FLINK-11654
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.7.1
>Reporter: Jürgen Kreileder
>Priority: Major
>  Labels: stale-assigned
>
> We run multiple jobs on a cluster which write a lot to the same Kafka topic 
> from identically named sinks. When EXACTLY_ONCE semantic is enabled for the 
> KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go 
> into a restart cycle.
> Example exception from the Kafka log:
>  
> {code:java}
> [2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing 
> append operation on partition finding-commands-dev-1-0 
> (kafka.server.ReplicaManager)
> org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is 
> no longer valid. There is probably another producer with a newer epoch. 483 
> (request epoch), 484 (server epoch)
> {code}
> The reason for this is the way FlinkKafkaProducer initializes the 
> TransactionalIdsGenerator:
> The IDs are only guaranteed to be unique for a single Job. But they can clash 
> between different Jobs (and Clusters).
>  
>  
> {code:java}
> --- 
> a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> +++ 
> b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> @@ -819,6 +819,7 @@ public class FlinkKafkaProducer
>                 nextTransactionalIdHintState = 
> context.getOperatorStateStore().getUnionListState(
>                         NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
>                 transactionalIdsGenerator = new TransactionalIdsGenerator(
> + // the prefix probably should include job id and maybe cluster id
>                         getRuntimeContext().getTaskName() + "-" + 
> ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(),
>                         getRuntimeContext().getIndexOfThisSubtask(),
>                         
> getRuntimeContext().getNumberOfParallelSubtasks(),{code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (FLINK-12396) KafkaITCase.testOneSourceMultiplePartitions doesn't fail properly

2021-04-24 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-12396?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin resolved FLINK-12396.
--
Resolution: Fixed

Closing the ticket as we haven't seen this issue for over a year.

> KafkaITCase.testOneSourceMultiplePartitions doesn't fail properly
> -
>
> Key: FLINK-12396
> URL: https://issues.apache.org/jira/browse/FLINK-12396
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.9.0
>Reporter: Bowen Li
>Priority: Major
>  Labels: stale-major
>
> https://api.travis-ci.org/v3/job/527599974/log.txt
> In the log, we can see that KafkaITCase.testOneSourceMultiplePartitions 
> failed, but it kept running and doing all the snapshot, which caused the 
> built to timeout.
> {code:java}
> 05:00:38,896 INFO  
> org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink
>   - Snapshot of counter 4800 at checkpoint 115
> 05:00:39,050 ERROR org.apache.flink.streaming.connectors.kafka.KafkaITCase
>- 
> 
> Test 
> testOneSourceMultiplePartitions(org.apache.flink.streaming.connectors.kafka.KafkaITCase)
>  failed with:
> org.junit.runners.model.TestTimedOutException: test timed out after 6 
> milliseconds
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>   at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
>   at 
> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
>   at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
>   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>   at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:621)
>   at 
> org.apache.flink.streaming.util.TestStreamEnvironment.execute(TestStreamEnvironment.java:79)
>   at org.apache.flink.test.util.TestUtils.tryExecute(TestUtils.java:35)
>   at 
> org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.runOneSourceMultiplePartitionsExactlyOnceTest(KafkaConsumerTestBase.java:924)
>   at 
> org.apache.flink.streaming.connectors.kafka.KafkaITCase.testOneSourceMultiplePartitions(KafkaITCase.java:102)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at java.lang.Thread.run(Thread.java:748)
> 
> 05:00:39,057 INFO  org.apache.flink.streaming.connectors.kafka.KafkaITCase
>- 
> 
> Test 
> testCancelingFullTopic(org.apache.flink.streaming.connectors.kafka.KafkaITCase)
>  is running.
> 
> 05:00:39,396 INFO  
> org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink
>   - Snapshot of counter 4800 at checkpoint 116
> 05:00:39,896 INFO  
> org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink
>   - Snapshot of counter 4800 at checkpoint 117
> 05:00:40,396 INFO  
> org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink
>   - Snapshot of counter 4800 at checkpoint 118
> 05:00:40,896 INFO  
> org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink
>   - Snapshot of counter 4800 at checkpoint 119
> 05:00:41,396 INFO  
> org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink
>   - Snapshot of counter 4800 at checkpoint 120
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22357) Mark FLIP-27 Source API as stable

2021-04-24 Thread Jiangjie Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17331354#comment-17331354
 ] 

Jiangjie Qin commented on FLINK-22357:
--

[~sewen] Thanks for the proposal. The plan sounds good to me.

I think the only thing left in the FLIP-27 source is adding the common metrics 
implementation to the FLIP-27 base SourceReader.

> Mark FLIP-27 Source API as stable
> -
>
> Key: FLINK-22357
> URL: https://issues.apache.org/jira/browse/FLINK-22357
> Project: Flink
>  Issue Type: New Feature
>  Components: API / Core
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
> Fix For: 1.14.0
>
>
> The FLIP-27 source API was properly introduced in 1.11, has undergone some 
> major improvements in 1.12.
> During the stabilization in 1.13 we needed only one very minor change to 
> those interfaces.
> I think it is time to declare the core source API interfaces as stable, to 
> allow users to safely rely on them. I would suggest to do that for 1.14, 
> possibly even backport the annotation change to 1.13.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-13052) Supporting multi-topic when using kafkaTableSourceSinkFactoryBase.createStreamTableSource

2021-04-24 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13052?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin reassigned FLINK-13052:


Assignee: Qingsheng Ren

> Supporting multi-topic when using 
> kafkaTableSourceSinkFactoryBase.createStreamTableSource
> -
>
> Key: FLINK-13052
> URL: https://issues.apache.org/jira/browse/FLINK-13052
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka, Table SQL / API
>Affects Versions: 1.8.0
>Reporter: chaiyongqiang
>Assignee: Qingsheng Ren
>Priority: Major
>  Labels: stale-major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-11793) Make KafkaProducer more resilient to Kafka Broker Failures

2021-04-24 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-11793?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin reassigned FLINK-11793:


Assignee: Qingsheng Ren

> Make KafkaProducer more resilient to Kafka Broker Failures
> --
>
> Key: FLINK-11793
> URL: https://issues.apache.org/jira/browse/FLINK-11793
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.7.2
>Reporter: Konstantin Knauf
>Assignee: Qingsheng Ren
>Priority: Major
>  Labels: stale-major
>
> Similar to FLINK-11792 we can make the FlinkKafkaProducer more resilient 
> against Broker Failures by not immediately failing on certain 
> {{KafkaException}}'s, but by retrying in case there is only a change in 
> partition leadership.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-11736) flink kafka producer failed with NOT_LEADER_FOR_PARTITION

2021-04-24 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-11736?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin closed FLINK-11736.

Resolution: Not A Bug

Close the issue as not a bug because it is a user config issue.

> flink kafka producer failed with NOT_LEADER_FOR_PARTITION
> -
>
> Key: FLINK-11736
> URL: https://issues.apache.org/jira/browse/FLINK-11736
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Reporter: Kaicheng Sun
>Priority: Major
>  Labels: stale-major
>
> my flink program connect kafka as its sinker using "FlinkKafkaProducer011" 
> library.  But sometimes the flink will exit abnormally with this kind of 
> error:
> (2019-02-23 11:55:11,656 WARN  
> org.apache.kafka.clients.producer.internals.Sender- Got error 
> produce response with correlation id 55711 on topic-partition 
> tmp_sink_redis-17, retrying (8 attempts left). Error: NOT_LEADER_FOR_PARTITION
> 2019-02-23 11:55:11,658 WARN  
> org.apache.kafka.clients.producer.internals.Sender- Got error 
> produce response with correlation id 55712 on topic-partition 
> tmp_sink_redis-17, retrying (8 attempts left). Error: 
> NOT_LEADER_FOR_PARTITION)
>  
> The kafka cluster works properly, so I have no idea why this error will happen



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-10876) Deadlock if closing firstly pending transactions in FlinkKafkaProducer(011).close()

2021-04-24 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-10876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin closed FLINK-10876.

Resolution: Fixed

Closing the issue as it has not occurred since FLINK-10455 was fixed.

> Deadlock if closing firstly pending transactions in 
> FlinkKafkaProducer(011).close()
> ---
>
> Key: FLINK-10876
> URL: https://issues.apache.org/jira/browse/FLINK-10876
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Reporter: Andrey Zagrebin
>Priority: Major
>  Labels: stale-major
>
> While working on FLINK-10455, I encountered a deadlock in 
> _FlinkKafkaProducer(011).close()_ if _pendingTransactions_ are closed before 
> _currentTransaction_. There is no deadlock other way around.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-15703) FlinkKafkaConsumer should enable partition discovery by default.

2021-04-24 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin closed FLINK-15703.

Fix Version/s: (was: 1.13.0)
   Resolution: Won't Fix

After some discussion, we decided to keep the current behavior in the new 
KafkaSource. Users still need to enable periodic partition discovery with an 
explicit configuration.

> FlinkKafkaConsumer should enable partition discovery by default.
> 
>
> Key: FLINK-15703
> URL: https://issues.apache.org/jira/browse/FLINK-15703
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.9.1
>Reporter: Jiangjie Qin
>Priority: Major
>  Labels: stale-major, usability
>
> In most cases, users would expect a partition expansion in Kafka is 
> automatically picked up by Flink. So this partition discovery should be 
> enabled by default in the Kafka source.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (FLINK-21133) FLIP-27 Source does not work with synchronous savepoint

2021-04-19 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin resolved FLINK-21133.
--
Resolution: Fixed

Patch merged.

master:  a9cf18b4d25f130e0bd24d51b128bbcf71892b45
release-1.12: 0913824dda91b04612f2f885052635e0ca78f5b3

> FLIP-27 Source does not work with synchronous savepoint
> ---
>
> Key: FLINK-21133
> URL: https://issues.apache.org/jira/browse/FLINK-21133
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core, API / DataStream, Runtime / Checkpointing
>Affects Versions: 1.11.3, 1.12.1
>Reporter: Kezhu Wang
>Assignee: Jiangjie Qin
>Priority: Critical
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.13.0, 1.12.3
>
>
> I have pushed branch 
> [synchronous-savepoint-conflict-with-bounded-end-input-case|https://github.com/kezhuw/flink/commits/synchronous-savepoint-conflict-with-bounded-end-input-case]
>  in my repository. {{SavepointITCase.testStopSavepointWithFlip27Source}} 
> failed due to timeout.
> See also FLINK-21132 and 
> [apache/iceberg#2033|https://github.com/apache/iceberg/issues/2033]..



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-21133) FLIP-27 Source does not work with synchronous savepoint

2021-04-19 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated FLINK-21133:
-
Fix Version/s: (was: 1.11.4)

> FLIP-27 Source does not work with synchronous savepoint
> ---
>
> Key: FLINK-21133
> URL: https://issues.apache.org/jira/browse/FLINK-21133
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core, API / DataStream, Runtime / Checkpointing
>Affects Versions: 1.11.3, 1.12.1
>Reporter: Kezhu Wang
>Assignee: Jiangjie Qin
>Priority: Critical
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.13.0, 1.12.3
>
>
> I have pushed branch 
> [synchronous-savepoint-conflict-with-bounded-end-input-case|https://github.com/kezhuw/flink/commits/synchronous-savepoint-conflict-with-bounded-end-input-case]
>  in my repository. {{SavepointITCase.testStopSavepointWithFlip27Source}} 
> failed due to timeout.
> See also FLINK-21132 and 
> [apache/iceberg#2033|https://github.com/apache/iceberg/issues/2033]..



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22282) Move creation of SplitEnumerator to the SourceCoordinator thread

2021-04-14 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated FLINK-22282:
-
Issue Type: Bug  (was: Task)

> Move creation of SplitEnumerator to the SourceCoordinator thread
> 
>
> Key: FLINK-22282
> URL: https://issues.apache.org/jira/browse/FLINK-22282
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.12.2
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>Priority: Critical
> Fix For: 1.13.0, 1.12.3
>
>
> Currently the creation of the SplitEnumerator is in the JM main thread. In 
> case the SplitEnumerator instantiation takes long, the job execution will 
> timeout. The fix is moving the SplitEnumerator creation to the coordinator 
> thread.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22282) Move creation of SplitEnumerator to the SourceCoordinator thread

2021-04-14 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated FLINK-22282:
-
Priority: Critical  (was: Major)

> Move creation of SplitEnumerator to the SourceCoordinator thread
> 
>
> Key: FLINK-22282
> URL: https://issues.apache.org/jira/browse/FLINK-22282
> Project: Flink
>  Issue Type: Task
>  Components: Connectors / Common
>Affects Versions: 1.12.2
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>Priority: Critical
> Fix For: 1.13.0, 1.12.3
>
>
> Currently the creation of the SplitEnumerator is in the JM main thread. In 
> case the SplitEnumerator instantiation takes long, the job execution will 
> timeout. The fix is moving the SplitEnumerator creation to the coordinator 
> thread.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22282) Move creation of SplitEnumerator to the SourceCoordinator thread

2021-04-14 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated FLINK-22282:
-
Issue Type: Improvement  (was: Bug)

> Move creation of SplitEnumerator to the SourceCoordinator thread
> 
>
> Key: FLINK-22282
> URL: https://issues.apache.org/jira/browse/FLINK-22282
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Common
>Affects Versions: 1.12.2
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>Priority: Critical
> Fix For: 1.13.0, 1.12.3
>
>
> Currently the creation of the SplitEnumerator is in the JM main thread. In 
> case the SplitEnumerator instantiation takes long, the job execution will 
> timeout. The fix is moving the SplitEnumerator creation to the coordinator 
> thread.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22282) Move creation of SplitEnumerator to the SourceCoordinator thread

2021-04-14 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated FLINK-22282:
-
Fix Version/s: 1.12.3
   1.13.0

> Move creation of SplitEnumerator to the SourceCoordinator thread
> 
>
> Key: FLINK-22282
> URL: https://issues.apache.org/jira/browse/FLINK-22282
> Project: Flink
>  Issue Type: Task
>  Components: Connectors / Common
>Affects Versions: 1.12.2
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>Priority: Major
> Fix For: 1.13.0, 1.12.3
>
>
> Currently the creation of the SplitEnumerator is in the JM main thread. In 
> case the SplitEnumerator instantiation takes long, the job execution will 
> timeout. The fix is moving the SplitEnumerator creation to the coordinator 
> thread.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-22282) Move creation of SplitEnumerator to the SourceCoordinator thread

2021-04-14 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin reassigned FLINK-22282:


Assignee: Jiangjie Qin

> Move creation of SplitEnumerator to the SourceCoordinator thread
> 
>
> Key: FLINK-22282
> URL: https://issues.apache.org/jira/browse/FLINK-22282
> Project: Flink
>  Issue Type: Task
>  Components: Connectors / Common
>Affects Versions: 1.12.2
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>Priority: Major
>
> Currently the creation of the SplitEnumerator is in the JM main thread. In 
> case the SplitEnumerator instantiation takes long, the job execution will 
> timeout. The fix is moving the SplitEnumerator creation to the coordinator 
> thread.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22282) Move creation of SplitEnumerator to the SourceCoordinator thread

2021-04-14 Thread Jiangjie Qin (Jira)
Jiangjie Qin created FLINK-22282:


 Summary: Move creation of SplitEnumerator to the SourceCoordinator 
thread
 Key: FLINK-22282
 URL: https://issues.apache.org/jira/browse/FLINK-22282
 Project: Flink
  Issue Type: Task
  Components: Connectors / Common
Affects Versions: 1.12.2
Reporter: Jiangjie Qin


Currently the creation of the SplitEnumerator is in the JM main thread. In case 
the SplitEnumerator instantiation takes long, the job execution will timeout. 
The fix is moving the SplitEnumerator creation to the coordinator thread.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-20431) KafkaSourceReaderTest.testCommitOffsetsWithoutAliveFetchers:133->lambda$testCommitOffsetsWithoutAliveFetchers$3:134 expected:<10> but was:<1>

2021-04-07 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20431?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated FLINK-20431:
-
Fix Version/s: (was: 1.13.0)
   1.13.1

> KafkaSourceReaderTest.testCommitOffsetsWithoutAliveFetchers:133->lambda$testCommitOffsetsWithoutAliveFetchers$3:134
>  expected:<10> but was:<1>
> -
>
> Key: FLINK-20431
> URL: https://issues.apache.org/jira/browse/FLINK-20431
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.12.2, 1.13.0
>Reporter: Huang Xingbo
>Assignee: Jiangjie Qin
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.12.3, 1.13.1
>
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=10351=logs=c5f0071e-1851-543e-9a45-9ac140befc32=1fb1a56f-e8b5-5a82-00a0-a2db7757b4f5]
> [ERROR] Failures: 
> [ERROR] 
> KafkaSourceReaderTest.testCommitOffsetsWithoutAliveFetchers:133->lambda$testCommitOffsetsWithoutAliveFetchers$3:134
>  expected:<10> but was:<1>
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15493) FlinkKafkaInternalProducerITCase.testProducerWhenCommitEmptyPartitionsToOutdatedTxnCoordinator failed on travis

2021-04-07 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15493?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated FLINK-15493:
-
Fix Version/s: (was: 1.13.0)
   1.13.1

> FlinkKafkaInternalProducerITCase.testProducerWhenCommitEmptyPartitionsToOutdatedTxnCoordinator
>  failed on travis
> ---
>
> Key: FLINK-15493
> URL: https://issues.apache.org/jira/browse/FLINK-15493
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka, Tests
>Affects Versions: 1.10.0, 1.13.0
>Reporter: Dian Fu
>Assignee: Jiangjie Qin
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.13.1
>
>
> FlinkKafkaInternalProducerITCase.testProducerWhenCommitEmptyPartitionsToOutdatedTxnCoordinator
>  failed on travis with the following exception:
> {code}
> Test 
> testProducerWhenCommitEmptyPartitionsToOutdatedTxnCoordinator(org.apache.flink.streaming.connectors.kafka.FlinkKafkaInternalProducerITCase)
>  failed with: org.junit.runners.model.TestTimedOutException: test timed out 
> after 3 milliseconds at java.lang.Object.wait(Native Method) at 
> java.lang.Object.wait(Object.java:502) at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:92)
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
>  at 
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.createTestTopic(KafkaTestEnvironmentImpl.java:177)
>  at 
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment.createTestTopic(KafkaTestEnvironment.java:115)
>  at 
> org.apache.flink.streaming.connectors.kafka.KafkaTestBase.createTestTopic(KafkaTestBase.java:197)
>  at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaInternalProducerITCase.testProducerWhenCommitEmptyPartitionsToOutdatedTxnCoordinator(FlinkKafkaInternalProducerITCase.java:176)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498) at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>  at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
>  at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
> java.lang.Thread.run(Thread.java:748)
> {code}
> instance: [https://api.travis-ci.org/v3/job/633307060/log.txt]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   3   4   5   6   >