[jira] [Created] (FLINK-30470) Change groupId for flink-connector-parent

2022-12-20 Thread Martijn Visser (Jira)
Martijn Visser created FLINK-30470:
--

 Summary: Change groupId for flink-connector-parent
 Key: FLINK-30470
 URL: https://issues.apache.org/jira/browse/FLINK-30470
 Project: Flink
  Issue Type: Technical Debt
  Components: Connectors / Common
Reporter: Martijn Visser
Assignee: Chesnay Schepler


During the migration of the connectors to the external connector framework, 
we've used io.github.zentol.flink as groupId for the artifact 
flink-connector-parent to have the ability to quickly iterate on the connector 
parent. 

With the first wave of migrations and releases being completed, we should 
change this back to org.apache.flink



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30465) Remove flink-connector-jdbc from master branch

2022-12-20 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17650176#comment-17650176
 ] 

Martijn Visser commented on FLINK-30465:


[~lemonjing] I've created FLINK-30470 for this

> Remove flink-connector-jdbc from master branch
> --
>
> Key: FLINK-30465
> URL: https://issues.apache.org/jira/browse/FLINK-30465
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / JDBC
>Reporter: Martijn Visser
>Assignee: Martijn Visser
>Priority: Major
>  Labels: pull-request-available
>
> With JDBC being synchronized to 
> https://github.com/apache/flink-connector-jdbc we can now remove the JDBC 
> connector from {{master}} so it won't be included in the 1.17 release anymore



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30465) Remove flink-connector-jdbc from master branch

2022-12-20 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17650171#comment-17650171
 ] 

Martijn Visser commented on FLINK-30465:


[~lemonjing] Good reminder, we still need to change that back. Let me create a 
ticket for that

> Remove flink-connector-jdbc from master branch
> --
>
> Key: FLINK-30465
> URL: https://issues.apache.org/jira/browse/FLINK-30465
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / JDBC
>Reporter: Martijn Visser
>Assignee: Martijn Visser
>Priority: Major
>  Labels: pull-request-available
>
> With JDBC being synchronized to 
> https://github.com/apache/flink-connector-jdbc we can now remove the JDBC 
> connector from {{master}} so it won't be included in the 1.17 release anymore



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30469) FLIP-266: Simplify network memory configurations for TaskManager

2022-12-20 Thread Yuxin Tan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30469?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuxin Tan updated FLINK-30469:
--
Description: 
When using Flink, users may encounter the following issues that affect 
usability.
1. The job may fail with an "Insufficient number of network buffers" exception.
2. Flink network memory size adjustment is complex.

When encountering these issues, users can solve some problems by adding or 
adjusting parameters. However, multiple memory config options should be 
changed. The config option adjustment requires understanding the detailed 
internal implementation, which is impractical for most users.

To resolve the issues, we propose some improvement solutions. For more details 
see 
[FLIP-266|https://cwiki.apache.org/confluence/display/FLINK/FLIP-266%3A+Simplify+network+memory+configurations+for+TaskManager].

  was:
When using Flink, users may encounter the following issues that affect 
usability.
1. The job may fail with an "Insufficient number of network buffers" exception.
2. Flink network memory size adjustment is complex.

When encountering these issues, users can solve some problems by adding or 
adjusting parameters. However, multiple memory config options should be 
changed. The config option adjustment requires understanding the detailed 
internal implementation, which is impractical for most users.

To resolve the issues, we propose some improvement solutions. For more details 
see 
[FLIP-266|[https://cwiki.apache.org/confluence/display/FLINK/FLIP-266%3A+Simplify+network+memory+configurations+for+TaskManager]].


> FLIP-266: Simplify network memory configurations for TaskManager
> 
>
> Key: FLINK-30469
> URL: https://issues.apache.org/jira/browse/FLINK-30469
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Affects Versions: 1.17.0
>Reporter: Yuxin Tan
>Priority: Major
>
> When using Flink, users may encounter the following issues that affect 
> usability.
> 1. The job may fail with an "Insufficient number of network buffers" 
> exception.
> 2. Flink network memory size adjustment is complex.
> When encountering these issues, users can solve some problems by adding or 
> adjusting parameters. However, multiple memory config options should be 
> changed. The config option adjustment requires understanding the detailed 
> internal implementation, which is impractical for most users.
> To resolve the issues, we propose some improvement solutions. For more 
> details see 
> [FLIP-266|https://cwiki.apache.org/confluence/display/FLINK/FLIP-266%3A+Simplify+network+memory+configurations+for+TaskManager].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30469) FLIP-266: Simplify network memory configurations for TaskManager

2022-12-20 Thread Yuxin Tan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30469?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuxin Tan updated FLINK-30469:
--
Description: 
When using Flink, users may encounter the following issues that affect 
usability.
1. The job may fail with an "Insufficient number of network buffers" exception.
2. Flink network memory size adjustment is complex.

When encountering these issues, users can solve some problems by adding or 
adjusting parameters. However, multiple memory config options should be 
changed. The config option adjustment requires understanding the detailed 
internal implementation, which is impractical for most users.

To resolve the issues, we propose some improvement solutions. For more details 
see 
[FLIP-266|[https://cwiki.apache.org/confluence/display/FLINK/FLIP-266%3A+Simplify+network+memory+configurations+for+TaskManager]].

  was:
When using Flink, users may encounter the following issues that affect 
usability.
1. The job may fail with an "Insufficient number of network buffers" exception.
2. Flink network memory size adjustment is complex.


When encountering these issues, users can solve some problems by adding or 
adjusting parameters. However, multiple memory config options should be 
changed. The config option adjustment requires understanding the detailed 
internal implementation, which is impractical for most users.

To resolve the issues, we propose some improvement solutions. For more details 
see 
[FLIP-266|[https://cwiki.apache.org/confluence/display/FLINK/FLIP-266%3A+Simplify+network+memory+configurations+for+TaskManager].]


> FLIP-266: Simplify network memory configurations for TaskManager
> 
>
> Key: FLINK-30469
> URL: https://issues.apache.org/jira/browse/FLINK-30469
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Affects Versions: 1.17.0
>Reporter: Yuxin Tan
>Priority: Major
>
> When using Flink, users may encounter the following issues that affect 
> usability.
> 1. The job may fail with an "Insufficient number of network buffers" 
> exception.
> 2. Flink network memory size adjustment is complex.
> When encountering these issues, users can solve some problems by adding or 
> adjusting parameters. However, multiple memory config options should be 
> changed. The config option adjustment requires understanding the detailed 
> internal implementation, which is impractical for most users.
> To resolve the issues, we propose some improvement solutions. For more 
> details see 
> [FLIP-266|[https://cwiki.apache.org/confluence/display/FLINK/FLIP-266%3A+Simplify+network+memory+configurations+for+TaskManager]].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30469) FLIP-266: Simplify network memory configurations for TaskManager

2022-12-20 Thread Yuxin Tan (Jira)
Yuxin Tan created FLINK-30469:
-

 Summary: FLIP-266: Simplify network memory configurations for 
TaskManager
 Key: FLINK-30469
 URL: https://issues.apache.org/jira/browse/FLINK-30469
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Affects Versions: 1.17.0
Reporter: Yuxin Tan


When using Flink, users may encounter the following issues that affect 
usability.
1. The job may fail with an "Insufficient number of network buffers" exception.
2. Flink network memory size adjustment is complex.


When encountering these issues, users can solve some problems by adding or 
adjusting parameters. However, multiple memory config options should be 
changed. The config option adjustment requires understanding the detailed 
internal implementation, which is impractical for most users.

To resolve the issues, we propose some improvement solutions. For more details 
see 
[FLIP-266|[https://cwiki.apache.org/confluence/display/FLINK/FLIP-266%3A+Simplify+network+memory+configurations+for+TaskManager].]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-26974) Python EmbeddedThreadDependencyTests.test_add_python_file failed on azure

2022-12-20 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17650153#comment-17650153
 ] 

Matthias Pohl commented on FLINK-26974:
---

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=44116=logs=3e4dd1a2-fe2f-5e5d-a581-48087e718d53=b4612f28-e3b5-5853-8a8b-610ae894217a=28961

> Python EmbeddedThreadDependencyTests.test_add_python_file failed on azure
> -
>
> Key: FLINK-26974
> URL: https://issues.apache.org/jira/browse/FLINK-26974
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.15.0, 1.16.0, 1.17.0
>Reporter: Yun Gao
>Assignee: Huang Xingbo
>Priority: Major
>  Labels: auto-deprioritized-major, test-stability
>
> {code:java}
> Mar 31 10:49:17 === FAILURES 
> ===
> Mar 31 10:49:17 __ 
> EmbeddedThreadDependencyTests.test_add_python_file __
> Mar 31 10:49:17 
> Mar 31 10:49:17 self = 
>  testMethod=test_add_python_file>
> Mar 31 10:49:17 
> Mar 31 10:49:17 def test_add_python_file(self):
> Mar 31 10:49:17 python_file_dir = os.path.join(self.tempdir, 
> "python_file_dir_" + str(uuid.uuid4()))
> Mar 31 10:49:17 os.mkdir(python_file_dir)
> Mar 31 10:49:17 python_file_path = os.path.join(python_file_dir, 
> "test_dependency_manage_lib.py")
> Mar 31 10:49:17 with open(python_file_path, 'w') as f:
> Mar 31 10:49:17 f.write("def add_two(a):\nraise 
> Exception('This function should not be called!')")
> Mar 31 10:49:17 self.t_env.add_python_file(python_file_path)
> Mar 31 10:49:17 
> Mar 31 10:49:17 python_file_dir_with_higher_priority = os.path.join(
> Mar 31 10:49:17 self.tempdir, "python_file_dir_" + 
> str(uuid.uuid4()))
> Mar 31 10:49:17 os.mkdir(python_file_dir_with_higher_priority)
> Mar 31 10:49:17 python_file_path_higher_priority = 
> os.path.join(python_file_dir_with_higher_priority,
> Mar 31 10:49:17 
> "test_dependency_manage_lib.py")
> Mar 31 10:49:17 with open(python_file_path_higher_priority, 'w') as f:
> Mar 31 10:49:17 f.write("def add_two(a):\nreturn a + 2")
> Mar 31 10:49:17 
> self.t_env.add_python_file(python_file_path_higher_priority)
> Mar 31 10:49:17 
> Mar 31 10:49:17 def plus_two(i):
> Mar 31 10:49:17 from test_dependency_manage_lib import add_two
> Mar 31 10:49:17 return add_two(i)
> Mar 31 10:49:17 
> Mar 31 10:49:17 self.t_env.create_temporary_system_function(
> Mar 31 10:49:17 "add_two", udf(plus_two, DataTypes.BIGINT(), 
> DataTypes.BIGINT()))
> Mar 31 10:49:17 table_sink = source_sink_utils.TestAppendSink(
> Mar 31 10:49:17 ['a', 'b'], [DataTypes.BIGINT(), 
> DataTypes.BIGINT()])
> Mar 31 10:49:17 self.t_env.register_table_sink("Results", table_sink)
> Mar 31 10:49:17 t = self.t_env.from_elements([(1, 2), (2, 5), (3, 
> 1)], ['a', 'b'])
> Mar 31 10:49:17 >   t.select(expr.call("add_two", t.a), 
> t.a).execute_insert("Results").wait()
> Mar 31 10:49:17 
> Mar 31 10:49:17 pyflink/table/tests/test_dependency.py:63: 
> Mar 31 10:49:17 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
> _ _ _ _ _ _ _ _ _ 
> Mar 31 10:49:17 pyflink/table/table_result.py:76: in wait
> Mar 31 10:49:17 get_method(self._j_table_result, "await")()
> Mar 31 10:49:17 
> .tox/py38-cython/lib/python3.8/site-packages/py4j/java_gateway.py:1321: in 
> __call__
> {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=34001=logs=821b528f-1eed-5598-a3b4-7f748b13f261=6bb545dd-772d-5d8c-f258-f5085fba3295=27239



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] ljz2051 commented on pull request #21516: [FLINK-28863][state]Add comments about why snapshot result of RocksDB native savepoint doesn't have empty shared-state

2022-12-20 Thread GitBox


ljz2051 commented on PR #21516:
URL: https://github.com/apache/flink/pull/21516#issuecomment-1360961571

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zhuzhurk commented on a diff in pull request #21388: [FLINK-30188][coordination] Set partition finished state in ConsumedPartitionGroup for dynamic graph correctly.

2022-12-20 Thread GitBox


zhuzhurk commented on code in PR #21388:
URL: https://github.com/apache/flink/pull/21388#discussion_r1054064663


##
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphConstructionTest.java:
##
@@ -337,6 +337,159 @@ void testRegisterConsumedPartitionGroupToEdgeManager() 
throws Exception {
 partition1.getPartitionId(), 
partition2.getPartitionId());
 }
 
+@Test
+void testPointWiseConsumedPartitionGroupPartitionFinished() throws 
Exception {
+JobVertex v1 = new JobVertex("source");
+JobVertex v2 = new JobVertex("sink");
+
+v1.setParallelism(4);
+v2.setParallelism(2);
+
+v2.connectNewDataSetAsInput(
+v1, DistributionPattern.POINTWISE, 
ResultPartitionType.BLOCKING);
+
+List ordered = new ArrayList<>(Arrays.asList(v1, v2));
+ExecutionGraph eg = createDefaultExecutionGraph(ordered);
+eg.attachJobGraph(ordered);
+
+IntermediateResult result =
+
Objects.requireNonNull(eg.getJobVertex(v1.getID())).getProducedDataSets()[0];
+
+IntermediateResultPartition partition1 = result.getPartitions()[0];
+IntermediateResultPartition partition2 = result.getPartitions()[1];
+IntermediateResultPartition partition3 = result.getPartitions()[2];
+IntermediateResultPartition partition4 = result.getPartitions()[3];
+
+ConsumedPartitionGroup consumedPartitionGroup1 =
+partition1.getConsumedPartitionGroups().get(0);
+
+ConsumedPartitionGroup consumedPartitionGroup2 =
+partition4.getConsumedPartitionGroups().get(0);
+
+
assertThat(consumedPartitionGroup1.getNumberOfUnfinishedPartitions()).isEqualTo(2);
+
assertThat(consumedPartitionGroup2.getNumberOfUnfinishedPartitions()).isEqualTo(2);
+partition1.markFinished();
+partition2.markFinished();
+
assertThat(consumedPartitionGroup1.getNumberOfUnfinishedPartitions()).isZero();
+partition3.markFinished();
+partition4.markFinished();
+
assertThat(consumedPartitionGroup1.getNumberOfUnfinishedPartitions()).isZero();

Review Comment:
   consumedPartitionGroup1 -> consumedPartitionGroup2



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zhuzhurk commented on a diff in pull request #21388: [FLINK-30188][coordination] Set partition finished state in ConsumedPartitionGroup for dynamic graph correctly.

2022-12-20 Thread GitBox


zhuzhurk commented on code in PR #21388:
URL: https://github.com/apache/flink/pull/21388#discussion_r1054062698


##
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManagerBuildUtil.java:
##
@@ -201,10 +206,25 @@ private static ConsumedPartitionGroup 
createAndRegisterConsumedPartitionGroupToE
 ConsumedPartitionGroup consumedPartitionGroup =
 ConsumedPartitionGroup.fromMultiplePartitions(
 numConsumers, consumedPartitions, 
intermediateResult.getResultType());
+finishAllDataProducedPartitions(
+intermediateResult, consumedPartitions, 
consumedPartitionGroup);
 registerConsumedPartitionGroupToEdgeManager(consumedPartitionGroup, 
intermediateResult);
 return consumedPartitionGroup;
 }
 
+private static void finishAllDataProducedPartitions(
+IntermediateResult intermediateResult,
+List consumedPartitionIds,
+ConsumedPartitionGroup consumedPartitionGroup) {
+for (IntermediateResultPartitionID consumedPartitionId : 
consumedPartitionIds) {
+// this is for dynamic graph as consumedPartitionGroup has not 
been created when the
+// partition is finish.

Review Comment:
   is finish -> becomes finished



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-19935) Supports configure heap memory of sql-client to avoid OOM

2022-12-20 Thread Elkhan Dadashov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17650132#comment-17650132
 ] 

Elkhan Dadashov commented on FLINK-19935:
-

This happens in Flink 1.16.0 version also when SQL script is very large 
(>=164kb).

This is very easy to reproduce with `./bin/sql-client -f 
script_larger_than164kb.sql` command.

> Supports configure heap memory of sql-client to avoid OOM
> -
>
> Key: FLINK-19935
> URL: https://issues.apache.org/jira/browse/FLINK-19935
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Client
>Affects Versions: 1.11.2
>Reporter: harold.miao
>Priority: Minor
>  Labels: auto-deprioritized-major
> Fix For: 1.17.0
>
> Attachments: image-2020-11-03-10-31-08-294.png
>
>
> hi 
> when use sql-client submit job,  the command below donot set JVM heap 
> pramameters. And cause OOM error in my production environment.
> exec $JAVA_RUN $JVM_ARGS "${log_setting[@]}" -classpath "`manglePathList 
> "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" 
> org.apache.flink.table.client.SqlClient "$@"
>  
> !image-2020-11-03-10-31-08-294.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] flinkbot commented on pull request #21541: Update intro_to_table_api.md

2022-12-20 Thread GitBox


flinkbot commented on PR #21541:
URL: https://github.com/apache/flink/pull/21541#issuecomment-1360950544

   
   ## CI report:
   
   * 08dd378369069be2aa78aa9454f855d8c9b7a241 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-29849) Event time temporal join on an upsert source may produce incorrect execution plan

2022-12-20 Thread Godfrey He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Godfrey He closed FLINK-29849.
--
Resolution: Fixed

Fixed in master: eb44ac01c9969cb22ab832b6b2155b109f015b06

> Event time temporal join on an upsert source may produce incorrect execution 
> plan
> -
>
> Key: FLINK-29849
> URL: https://issues.apache.org/jira/browse/FLINK-29849
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0, 1.15.3
>Reporter: lincoln lee
>Assignee: lincoln lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> For current implementation, the execution plan is incorrect when do event 
> time temporal join on an upsert source. There's two problems:
> 1.  for an upsert source, we should not add a ChangelogNormalize node under a 
> temporal join input, or it will damage the versions of the version table. For 
> versioned tables, we use a single-temporal mechanism which relies sequencial 
> records of a same key to ensure the valid period of each version, so if the 
> ChangelogNormalize was added then an UB message will be produced based on the 
> previous  UA or Insert message, and all the columns are totally same include 
> event time, e.g., 
> original upsert input
> {code}
> +I (key1, '2022-11-02 10:00:00', a1)
> +U (key1, '2022-11-02 10:01:03', a2)
> {code}
> the versioned data should be:
> {code}
> v1  [~, '2022-11-02 10:00:00')
> v2  ['2022-11-02 10:00:00', '2022-11-02 10:01:03')
> {code}
> after ChangelogNormalize's processing, will output:
> {code}
> +I (key1, '2022-11-02 10:00:00', a1)
> -U (key1, '2022-11-02 10:00:00', a1)
> +U (key1, '2022-11-02 10:01:03', a2)
> {code}
> versions are incorrect:
> {code}
> v1  ['2022-11-02 10:00:00', '2022-11-02 10:00:00')  // invalid period
> v2  ['2022-11-02 10:00:00', '2022-11-02 10:01:03')
> {code}
> 2. semantically, a filter cannot be pushed into an event time temporal join, 
> otherwise, the filter may also corrupt the versioned table



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] Water-Ghost opened a new pull request, #21541: Update intro_to_table_api.md

2022-12-20 Thread GitBox


Water-Ghost opened a new pull request, #21541:
URL: https://github.com/apache/flink/pull/21541

   there is an column name error, fixed it to "revenue"
   
   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the 
conventions defined in our code quality guide: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] godfreyhe closed pull request #21219: [FLINK-29849][table-planner] Fix event time temporal join on an upsert source may produce incorrect execution plan

2022-12-20 Thread GitBox


godfreyhe closed pull request #21219: [FLINK-29849][table-planner] Fix event 
time temporal join on an upsert source may produce incorrect execution plan
URL: https://github.com/apache/flink/pull/21219


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] Fanoid commented on a diff in pull request #191: [FLINK-30401] Add Estimator and Transformer for MinHashLSH

2022-12-20 Thread GitBox


Fanoid commented on code in PR #191:
URL: https://github.com/apache/flink-ml/pull/191#discussion_r1054052585


##
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/lsh/LSH.java:
##
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.lsh;
+
+import org.apache.flink.ml.api.Estimator;
+import org.apache.flink.ml.common.datastream.DataStreamUtils;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Base class for estimators which implement LSH (Locality-sensitive hashing) 
algorithms.
+ *
+ * The basic idea of LSH algorithms is to use to a family of hash functions 
to map data samples

Review Comment:
   `NUM_HASH_TABLES` and `NUM_HASH_FUNCTIONS_PER_TABLE` are for OR- and 
AND-amplification.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] 1996fanrui commented on pull request #21540: [FLINK-30468][web] Change the SortOrder of BusyRatio to descend by default

2022-12-20 Thread GitBox


1996fanrui commented on PR #21540:
URL: https://github.com/apache/flink/pull/21540#issuecomment-1360942141

   Hi @Myasuka , please help take a look in your free time, thanks~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-30409) Support reopening closed metric groups

2022-12-20 Thread Xintong Song (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30409?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xintong Song closed FLINK-30409.

Fix Version/s: 1.17.0
   Resolution: Done

master (1.17): 535f02c72f3dfdedd4f40a30cc27364a5f5c2cdf

> Support reopening closed metric groups
> --
>
> Key: FLINK-30409
> URL: https://issues.apache.org/jira/browse/FLINK-30409
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics
>Affects Versions: 1.17.0
>Reporter: Mason Chen
>Assignee: Mason Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> Currently, metricGroup.close() will unregister metrics and the underlying 
> metric groups. If the metricGroup is created again via addGroup(), it will 
> silently fail to create metrics since the metric group is in a closed state.
> We need to close metric groups and reopen them because some of the metrics 
> may reference old objects that are no longer relevant/stale and we need to 
> re-create the metric/metric group to point to the new references. For 
> example, we may close `KafkaSourceReader` to remove a topic partition from 
> assignment and then recreate `KafkaSourceReader` with a different set of 
> topic partitions. The metrics should also reflect that.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] xintongsong closed pull request #21539: [FLINK-30409][Runtime / Metrics] Return a new metric group when creating closed metric g…

2022-12-20 Thread GitBox


xintongsong closed pull request #21539: [FLINK-30409][Runtime / Metrics] Return 
a new metric group when creating closed metric g…
URL: https://github.com/apache/flink/pull/21539


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-25509) FLIP-208: Add RecordEvaluator to dynamically stop source based on de-serialized records

2022-12-20 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17650103#comment-17650103
 ] 

Hang Ruan commented on FLINK-25509:
---

Hi, [~lindong] ,

I have read the FLIP and I am willing to help to implement this feature. Would 
you mind to assign this ticket to me? 

Thanks~

 

> FLIP-208: Add RecordEvaluator to dynamically stop source based on 
> de-serialized records
> ---
>
> Key: FLINK-25509
> URL: https://issues.apache.org/jira/browse/FLINK-25509
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common, Connectors / Kafka
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
>
> This feature is needed to migrate applications which uses 
> KafkaDeserializationSchema::isEndOfStream() from using FlinkKafkaConsumer to 
> using KafkaSource.
> Please checkout 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Add+RecordEvaluator+to+dynamically+stop+source+based+on+de-serialized+records
>  for the motivation and the proposed changes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30465) Remove flink-connector-jdbc from master branch

2022-12-20 Thread Ran Tao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17650102#comment-17650102
 ] 

Ran Tao commented on FLINK-30465:
-

Hi, Martijn.  i found the external flink-connector-jdbc parent pom groupId is 
not standard by using github username. Can we use apache group id? 

FYI.

```


    io.github.zentol.flink
    flink-connector-parent
    1.0


4.0.0
org.apache.flink
flink-connector-jdbc-parent
3.1-SNAPSHOT

```

 

 

> Remove flink-connector-jdbc from master branch
> --
>
> Key: FLINK-30465
> URL: https://issues.apache.org/jira/browse/FLINK-30465
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / JDBC
>Reporter: Martijn Visser
>Assignee: Martijn Visser
>Priority: Major
>  Labels: pull-request-available
>
> With JDBC being synchronized to 
> https://github.com/apache/flink-connector-jdbc we can now remove the JDBC 
> connector from {{master}} so it won't be included in the 1.17 release anymore



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-30465) Remove flink-connector-jdbc from master branch

2022-12-20 Thread Ran Tao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17650102#comment-17650102
 ] 

Ran Tao edited comment on FLINK-30465 at 12/21/22 6:49 AM:
---

Hi, [~martijnvisser] .  i found the external flink-connector-jdbc parent pom 
groupId is not standard by using github username. Can we use apache group id? 

FYI.

```


    io.github.zentol.flink
    flink-connector-parent
    1.0


4.0.0
org.apache.flink
flink-connector-jdbc-parent
3.1-SNAPSHOT

```

 

 


was (Author: lemonjing):
Hi, Martijn.  i found the external flink-connector-jdbc parent pom groupId is 
not standard by using github username. Can we use apache group id? 

FYI.

```


    io.github.zentol.flink
    flink-connector-parent
    1.0


4.0.0
org.apache.flink
flink-connector-jdbc-parent
3.1-SNAPSHOT

```

 

 

> Remove flink-connector-jdbc from master branch
> --
>
> Key: FLINK-30465
> URL: https://issues.apache.org/jira/browse/FLINK-30465
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / JDBC
>Reporter: Martijn Visser
>Assignee: Martijn Visser
>Priority: Major
>  Labels: pull-request-available
>
> With JDBC being synchronized to 
> https://github.com/apache/flink-connector-jdbc we can now remove the JDBC 
> connector from {{master}} so it won't be included in the 1.17 release anymore



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-kubernetes-operator] nowke commented on pull request #487: [FLINK-30411] Validate empty JmSpec and TmSpec

2022-12-20 Thread GitBox


nowke commented on PR #487:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/487#issuecomment-1360919023

   > @gyfora how do you think we should fix this?
   
   @morhidi @gyfora I added further validations with configs set through 
`flinkConfiguration`. It does make the validation logic a little complicated. 
Do you suggest any alternative approach?
   
   Besides helm validations, the deployment gets stuck in `UPGRADING` state due 
to `ClusterDeploymentException`. Should it fail the deployment instead upon 
encountering  `ClusterDeploymentException` a fixed number of times? Even after 
the validations in this PR, the deployment can still go into this state by 
setting insufficient memory.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-elasticsearch] dingweiqings commented on a diff in pull request #48: [FLINK-28177] for test containers add elasticsearch service really up check

2022-12-20 Thread GitBox


dingweiqings commented on code in PR #48:
URL: 
https://github.com/apache/flink-connector-elasticsearch/pull/48#discussion_r1054014813


##
flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java:
##
@@ -65,13 +65,20 @@
 import static 
org.apache.flink.streaming.connectors.elasticsearch.table.TestContext.context;
 import static org.apache.flink.table.api.Expressions.row;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.testcontainers.containers.wait.strategy.Wait.forHttp;
 
 /** IT tests for {@link Elasticsearch6DynamicSink}. */
 public class Elasticsearch6DynamicSinkITCase extends TestLogger {
 
 @ClassRule
 public static ElasticsearchContainer elasticsearchContainer =
-new 
ElasticsearchContainer(DockerImageName.parse(DockerImageVersions.ELASTICSEARCH_6));
+new 
ElasticsearchContainer(DockerImageName.parse(DockerImageVersions.ELASTICSEARCH_6))
+.waitingFor(
+forHttp("/")
+.withMethod("HEAD")
+.forStatusCode(200)
+.forPort(9200)
+
.withStartupTimeout(Duration.ofMinutes(5)));

Review Comment:
   Updated. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #21540: [FLINK-30468][web] Change the SortOrder of BusyRatio to descend by default

2022-12-20 Thread GitBox


flinkbot commented on PR #21540:
URL: https://github.com/apache/flink/pull/21540#issuecomment-1360875603

   
   ## CI report:
   
   * 60a63d238f64aff9a94dc4cda3a1f21f8845887a UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] 1996fanrui commented on pull request #21540: [FLINK-30468][web] Change the SortOrder of BusyRatio to descend by default

2022-12-20 Thread GitBox


1996fanrui commented on PR #21540:
URL: https://github.com/apache/flink/pull/21540#issuecomment-1360874693

   After this PR: 
   
   1. The SortOrder of BusyRatio will be the desc by default.
   
   https://user-images.githubusercontent.com/38427477/208828005-d902a419-a197-4e55-be00-0b086cf89436.png;>
   
   2. It will sort by subtask when the user chooses not to sort.
   
   https://user-images.githubusercontent.com/38427477/208828189-3702ece1-a77e-43b8-b08f-0974ec4097b7.png;>
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-30468) The SortOrder of BusyRatio should be descend by default

2022-12-20 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30468?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-30468:
---
Labels: pull-request-available  (was: )

> The SortOrder of BusyRatio should be descend by default
> ---
>
> Key: FLINK-30468
> URL: https://issues.apache.org/jira/browse/FLINK-30468
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Web Frontend
>Reporter: Rui Fan
>Priority: Major
>  Labels: pull-request-available
>
> Currently, the sort order is ascend by default, it should be descend.
> The most busy subtask should be displayed on top.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] 1996fanrui opened a new pull request, #21540: [FLINK-30468][web] Change the SortOrder of BusyRatio to descend by default

2022-12-20 Thread GitBox


1996fanrui opened a new pull request, #21540:
URL: https://github.com/apache/flink/pull/21540

   ## What is the purpose of the change
   
   Currently, the sort order is ascend by default, it should be descend.
   
   The most busy subtask should be displayed on top.
   
   ## Brief change log
   
   - Change the SortOrder of BusyRatio to descend by default.
   - Don't change the order of `listOfSubTaskBackpressure`, it will sort by 
subtask when the user chooses not to sort.
   
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature?  no
 - If yes, how is the feature documented? not documented
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-30468) The SortOrder of BusyRatio should be descend by default

2022-12-20 Thread Rui Fan (Jira)
Rui Fan created FLINK-30468:
---

 Summary: The SortOrder of BusyRatio should be descend by default
 Key: FLINK-30468
 URL: https://issues.apache.org/jira/browse/FLINK-30468
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Web Frontend
Reporter: Rui Fan


Currently, the sort order is ascend by default, it should be descend.

The most busy subtask should be displayed on top.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #489: [FLINK-30406] Detect when jobmanager never started

2022-12-20 Thread GitBox


gyfora commented on code in PR #489:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/489#discussion_r1053989089


##
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java:
##
@@ -260,11 +265,96 @@ private FlinkDeployment cloneDeploymentWithUpgradeMode(
 }
 
 @ParameterizedTest
-@EnumSource(UpgradeMode.class)
-public void testUpgradeBeforeReachingStableSpec(UpgradeMode upgradeMode) 
throws Exception {
+@MethodSource("testUpgradeJmDeployCannotStartParams")
+public void testUpgradeJmDeployCannotStart(UpgradeMode fromMode, 
UpgradeMode toMode)
+throws Exception {

Review Comment:
   This test is indeed a bit too complex , I will try to make some extra 
methods comments to make it clearer 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] vacaly commented on a diff in pull request #192: [FLINK-30451] Add Estimator and Transformer for Swing

2022-12-20 Thread GitBox


vacaly commented on code in PR #192:
URL: https://github.com/apache/flink-ml/pull/192#discussion_r1053974995


##
flink-ml-lib/src/main/java/org/apache/flink/ml/recommendation/SwingModelData.java:
##
@@ -0,0 +1,106 @@
+package org.apache.flink.ml.recommendation;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.base.FloatSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.reader.SimpleStreamFormat;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.ml.clustering.kmeans.KMeansModelData;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class SwingModelData {
+   public String mainItem;
+   public ArrayList  items;

Review Comment:
   Thanks for your suggestion. Using `List` results in `ClassCastException` in 
`SwingModelData.getDataStream()`. No idea why it can't work.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] xintongsong commented on a diff in pull request #21496: [FLINK-29869][ResourceManager] make ResourceAllocator declarative.

2022-12-20 Thread GitBox


xintongsong commented on code in PR #21496:
URL: https://github.com/apache/flink/pull/21496#discussion_r1053928976


##
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java:
##
@@ -206,15 +220,29 @@ protected Optional 
getWorkerNodeIfAcceptRegistration(ResourceID reso
 return Optional.ofNullable(workerNodeMap.get(resourceID));
 }
 
+@VisibleForTesting
+public void declareResourceNeeded(Collection 
resourceDeclarations) {
+for (ResourceDeclaration resourceDeclaration : resourceDeclarations) {
+updateResourceDeclaration(resourceDeclaration);
+}
+
+checkResourceDeclarations();
+}
+
 @Override
-protected void onWorkerRegistered(WorkerType worker) {
+protected void onWorkerRegistered(WorkerType worker, WorkerResourceSpec 
workerResourceSpec) {
 final ResourceID resourceId = worker.getResourceID();
 log.info("Worker {} is registered.", 
resourceId.getStringWithMetadata());
 
-final WorkerResourceSpec workerResourceSpec =
-currentAttemptUnregisteredWorkers.remove(resourceId);
 tryRemovePreviousPendingRecoveryTaskManager(resourceId);
-if (workerResourceSpec != null) {
+
+if (!workerResourceSpecs.containsKey(worker.getResourceID())) {
+// recovered worker registered.
+workerResourceSpecs.put(worker.getResourceID(), 
workerResourceSpec);
+totalWorkerCounter.increaseAndGet(workerResourceSpec);
+}

Review Comment:
   Might be better to also add a log in this if-branch.



##
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java:
##
@@ -286,17 +314,119 @@ public void onError(Throwable exception) {
 onFatalError(exception);
 }
 
+@VisibleForTesting
+public int releaseUnWantedResources(

Review Comment:
   Move this method to the *Internal* section.



##
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java:
##
@@ -286,17 +314,119 @@ public void onError(Throwable exception) {
 onFatalError(exception);
 }
 
+@VisibleForTesting
+public int releaseUnWantedResources(
+Collection unwantedWorkers, int 
needReleaseWorkerNumber) {
+
+Exception cause =
+new FlinkExpectedException(
+"slot manager has determined that the resource is no 
longer needed");
+for (InstanceID unwantedWorker : unwantedWorkers) {
+if (needReleaseWorkerNumber <= 0) {
+break;
+}
+if (releaseResource(unwantedWorker, cause)) {
+needReleaseWorkerNumber--;
+}
+}
+return needReleaseWorkerNumber;
+}
+
 // 
 //  Internal
 // 
 
-private void releaseResource(InstanceID instanceId, Exception cause) {
+private void updateResourceDeclaration(ResourceDeclaration 
newResourceDeclaration) {
+WorkerResourceSpec workerResourceSpec = 
newResourceDeclaration.getSpec();
+ResourceDeclaration oldDeclaration = 
this.resourceDeclarations.get(workerResourceSpec);
+
+Set filteredUnwantedWorkers = new HashSet<>();
+if (oldDeclaration != null) {
+oldDeclaration
+.getUnwantedWorkers()
+.forEach(
+instanceID -> {
+if (getWorkerByInstanceId(instanceID) != null) 
{
+filteredUnwantedWorkers.add(instanceID);
+}
+});
+}
+
+newResourceDeclaration
+.getUnwantedWorkers()
+.forEach(
+instanceID -> {
+if (getWorkerByInstanceId(instanceID) != null) {
+filteredUnwantedWorkers.add(instanceID);
+}
+});
+
+this.resourceDeclarations.put(
+workerResourceSpec,
+new ResourceDeclaration(
+workerResourceSpec,
+newResourceDeclaration.getNumNeeded(),
+filteredUnwantedWorkers));
+}
+
+private void checkResourceDeclarations() {
+for (ResourceDeclaration resourceDeclaration : 
resourceDeclarations.values()) {
+WorkerResourceSpec workerResourceSpec = 
resourceDeclaration.getSpec();
+int declaredWorkerNumber = resourceDeclaration.getNumNeeded();
+
+final int releaseOrRequestWorkerNumber =
+totalWorkerCounter.getNum(workerResourceSpec) - 
declaredWorkerNumber;
+
+if 

[GitHub] [flink-ml] Fanoid commented on a diff in pull request #192: [FLINK-30451] Add Estimator and Transformer for Swing

2022-12-20 Thread GitBox


Fanoid commented on code in PR #192:
URL: https://github.com/apache/flink-ml/pull/192#discussion_r1053958500


##
flink-ml-lib/src/main/java/org/apache/flink/ml/recommendation/Swing.java:
##
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.recommendation;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.iteration.operator.OperatorStateUtils;
+import org.apache.flink.ml.api.Estimator;
+import org.apache.flink.ml.clustering.kmeans.KMeans;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.runtime.typeutils.TypeCheckUtils;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.Row;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * An Estimator which implements the Swing algorithm.
+ *
+ * Swing is an item recall model. The topology of user-item graph usually 
can be described as user-item-user or
+ * item-user-item, which are like 'swing'. For example, if both user 
u and user v have purchased the
+ * same commodity i , they will form a relationship diagram similar 
to a swing. If u and v
+ * have purchased commodity j in addition to i, it is 
supposed i and j are
+ * similar.
+ */
+public class Swing implements Estimator , SwingParams 
 {
+   private final Map , Object> paramMap = new HashMap <>();
+
+   public Swing() {
+   ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+   }
+
+   @Override
+   public SwingModel fit(Table... inputs) {
+
+   final String userCol = getUserCol();
+   final String itemCol = getItemCol();
+   final int minUserItems = getMinUserItems();
+   final int maxUserItems = getMaxUserItems();
+   final ResolvedSchema schema = ((TableImpl) 
inputs[0]).getResolvedSchema();
+   final LogicalType userColType = 
schema.getColumn(userCol).get().getDataType().getLogicalType();
+   final LogicalType itemColType = 
schema.getColumn(itemCol).get().getDataType().getLogicalType();
+
+   if 
(!TypeCheckUtils.isCharacterString(InternalTypeInfo.of(userColType).toLogicalType())
 ||
+   
!TypeCheckUtils.isCharacterString(InternalTypeInfo.of(itemColType).toLogicalType()))
 {
+   throw new IllegalArgumentException("Type of user and 
item column must be string.");
+   }
+
+   StreamTableEnvironment tEnv =
+   (StreamTableEnvironment) ((TableImpl) inputs[0])
+   .getTableEnvironment();
+
+   SingleOutputStreamOperator > itemUsers =
+   tEnv.toDataStream(inputs[0])
+   .map(row -> Tuple2.of((String) 
row.getFieldAs(userCol), (String) 

[GitHub] [flink-ml] Fanoid commented on a diff in pull request #192: [FLINK-30451] Add Estimator and Transformer for Swing

2022-12-20 Thread GitBox


Fanoid commented on code in PR #192:
URL: https://github.com/apache/flink-ml/pull/192#discussion_r1053958500


##
flink-ml-lib/src/main/java/org/apache/flink/ml/recommendation/Swing.java:
##
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.recommendation;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.iteration.operator.OperatorStateUtils;
+import org.apache.flink.ml.api.Estimator;
+import org.apache.flink.ml.clustering.kmeans.KMeans;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.runtime.typeutils.TypeCheckUtils;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.Row;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * An Estimator which implements the Swing algorithm.
+ *
+ * Swing is an item recall model. The topology of user-item graph usually 
can be described as user-item-user or
+ * item-user-item, which are like 'swing'. For example, if both user 
u and user v have purchased the
+ * same commodity i , they will form a relationship diagram similar 
to a swing. If u and v
+ * have purchased commodity j in addition to i, it is 
supposed i and j are
+ * similar.
+ */
+public class Swing implements Estimator , SwingParams 
 {
+   private final Map , Object> paramMap = new HashMap <>();
+
+   public Swing() {
+   ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+   }
+
+   @Override
+   public SwingModel fit(Table... inputs) {
+
+   final String userCol = getUserCol();
+   final String itemCol = getItemCol();
+   final int minUserItems = getMinUserItems();
+   final int maxUserItems = getMaxUserItems();
+   final ResolvedSchema schema = ((TableImpl) 
inputs[0]).getResolvedSchema();
+   final LogicalType userColType = 
schema.getColumn(userCol).get().getDataType().getLogicalType();
+   final LogicalType itemColType = 
schema.getColumn(itemCol).get().getDataType().getLogicalType();
+
+   if 
(!TypeCheckUtils.isCharacterString(InternalTypeInfo.of(userColType).toLogicalType())
 ||
+   
!TypeCheckUtils.isCharacterString(InternalTypeInfo.of(itemColType).toLogicalType()))
 {
+   throw new IllegalArgumentException("Type of user and 
item column must be string.");
+   }
+
+   StreamTableEnvironment tEnv =
+   (StreamTableEnvironment) ((TableImpl) inputs[0])
+   .getTableEnvironment();
+
+   SingleOutputStreamOperator > itemUsers =
+   tEnv.toDataStream(inputs[0])
+   .map(row -> Tuple2.of((String) 
row.getFieldAs(userCol), (String) 

[GitHub] [flink-ml] Fanoid commented on a diff in pull request #192: [FLINK-30451] Add Estimator and Transformer for Swing

2022-12-20 Thread GitBox


Fanoid commented on code in PR #192:
URL: https://github.com/apache/flink-ml/pull/192#discussion_r1053957894


##
flink-ml-lib/src/main/java/org/apache/flink/ml/recommendation/Swing.java:
##
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.recommendation;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.iteration.operator.OperatorStateUtils;
+import org.apache.flink.ml.api.Estimator;
+import org.apache.flink.ml.clustering.kmeans.KMeans;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.runtime.typeutils.TypeCheckUtils;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.Row;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * An Estimator which implements the Swing algorithm.
+ *
+ * Swing is an item recall model. The topology of user-item graph usually 
can be described as user-item-user or
+ * item-user-item, which are like 'swing'. For example, if both user 
u and user v have purchased the
+ * same commodity i , they will form a relationship diagram similar 
to a swing. If u and v
+ * have purchased commodity j in addition to i, it is 
supposed i and j are
+ * similar.
+ */
+public class Swing implements Estimator , SwingParams 
 {
+   private final Map , Object> paramMap = new HashMap <>();
+
+   public Swing() {
+   ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+   }
+
+   @Override
+   public SwingModel fit(Table... inputs) {
+
+   final String userCol = getUserCol();
+   final String itemCol = getItemCol();
+   final int minUserItems = getMinUserItems();
+   final int maxUserItems = getMaxUserItems();
+   final ResolvedSchema schema = ((TableImpl) 
inputs[0]).getResolvedSchema();
+   final LogicalType userColType = 
schema.getColumn(userCol).get().getDataType().getLogicalType();
+   final LogicalType itemColType = 
schema.getColumn(itemCol).get().getDataType().getLogicalType();
+
+   if 
(!TypeCheckUtils.isCharacterString(InternalTypeInfo.of(userColType).toLogicalType())
 ||
+   
!TypeCheckUtils.isCharacterString(InternalTypeInfo.of(itemColType).toLogicalType()))
 {
+   throw new IllegalArgumentException("Type of user and 
item column must be string.");
+   }
+
+   StreamTableEnvironment tEnv =
+   (StreamTableEnvironment) ((TableImpl) inputs[0])
+   .getTableEnvironment();
+
+   SingleOutputStreamOperator > itemUsers =
+   tEnv.toDataStream(inputs[0])
+   .map(row -> Tuple2.of((String) 
row.getFieldAs(userCol), (String) 

[GitHub] [flink] fsk119 merged pull request #21433: [hotfix][table-planner] Fix inconsistent test data for temporal join

2022-12-20 Thread GitBox


fsk119 merged PR #21433:
URL: https://github.com/apache/flink/pull/21433


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] zhipeng93 commented on a diff in pull request #191: [FLINK-30401] Add Estimator and Transformer for MinHashLSH

2022-12-20 Thread GitBox


zhipeng93 commented on code in PR #191:
URL: https://github.com/apache/flink-ml/pull/191#discussion_r1053903196


##
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/lsh/LSH.java:
##
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.lsh;
+
+import org.apache.flink.ml.api.Estimator;
+import org.apache.flink.ml.common.datastream.DataStreamUtils;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Base class for estimators which implement LSH (Locality-sensitive hashing) 
algorithms.
+ *
+ * The basic idea of LSH algorithms is to use to a family of hash functions 
to map data samples
+ * to buckets, where closer samples are expected to be in same buckets with 
higher probabilities,
+ * and vice versa. AND-amplification and OR-amplification are utilized to 
increase the recall and
+ * precision when searching close samples.
+ *
+ * An LSH algorithm is specified by its mapping function and corresponding 
distance metric (see
+ * {@link LSHScheme}).
+ *
+ * See: https://en.wikipedia.org/wiki/Locality-sensitive_hashing;>Locality-sensitive_hashing.
+ *
+ * @param  class type of the Estimator implementation itself.
+ * @param  class type of the Model this Estimator produces.
+ */
+abstract class LSH, M extends LSHModel>
+implements Estimator, LSHParams {
+private final Map, Object> paramMap = new HashMap<>();
+
+public LSH() {
+ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+}
+
+@Override
+public M fit(Table... inputs) {
+Preconditions.checkArgument(inputs.length == 1);
+StreamTableEnvironment tEnv =
+(StreamTableEnvironment) ((TableImpl) 
inputs[0]).getTableEnvironment();
+DataStream inputDim = 
getVectorSize(tEnv.toDataStream(inputs[0]), getInputCol());
+return createModel(inputDim, tEnv);
+}
+
+protected abstract M createModel(DataStream inputDim, 
StreamTableEnvironment tEnv);
+
+private static DataStream getVectorSize(DataStream input, 
String vectorCol) {
+DataStream vecSizeDataStream =

Review Comment:
   nit: `vecSizeDataStream` --> `vectorSizes`



##
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/lsh/LSHScheme.java:
##
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.lsh;
+
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.Vector;
+
+/**
+ * Interface for an LSH scheme. An LSH scheme should implement how to map a 
feature vector to
+ * multiple hash vectors, and how to calculate corresponding distance between 
two feature vectors.
+ */
+interface LSHScheme {

Review Comment:
   How about renaming it as `LSHModelData` and making it an abstract class? 
Since different LSH model data are indeed implementations of this 
interface/class. 
   
   In this case the class 

[GitHub] [flink-table-store] tsreaper commented on a diff in pull request #443: [FLINK-30458] Refactor Table Store Documentation

2022-12-20 Thread GitBox


tsreaper commented on code in PR #443:
URL: https://github.com/apache/flink-table-store/pull/443#discussion_r1053935750


##
docs/content/docs/features/table-types.md:
##
@@ -0,0 +1,142 @@
+---
+title: "Table Types"
+weight: 1
+type: docs
+aliases:
+- /features/table-types.html
+---
+
+
+# Table Types
+
+Table Store supports various types of tables. Users can specify `write-mode` 
table property to specify table types when creating tables.
+
+## Changelog Tables with Primary Keys
+
+Changelog table is the default table type when creating a table. Users can 
also specify `'write-mode' = 'change-log'` explicitly in table properties when 
creating the table.
+
+Primary keys are a set of columns that are unique for each record. Table Store 
imposes an ordering of data, which means the system will sort the primary key 
within each bucket. Using this feature, users can achieve high performance by 
adding filter conditions on the primary key.
+
+By [defining primary keys]({{< ref 
"docs/sql-api/creating-tables#tables-with-primary-keys" >}}) on a changelog 
table, users can access the following features.
+
+### Merge Engines
+
+When Table Store sink receives two or more records with the same primary keys, 
it will merge them into one record to keep primary keys unique. By specifying 
the `merge-engine` table property, users can choose how records are merged 
together.
+
+ Deduplicate
+
+`deduplicate` merge engine is the default merge engine. Table Store will only 
keep the latest record and throw away other records with the same primary keys.
+
+Specifically, if the latest record is a `DELETE` record, all records with the 
same primary keys will be deleted.
+
+ Partial Update
+
+By specifying `'merge-engine' = 'partial-update'`, users can set columns of a 
record across multiple updates and finally get a complete record. Specifically, 
value fields are updated to the latest data one by one under the same primary 
key, but null values are not overwritten.
+
+For example, let's say Table Store receives three records `<1, 23.0, 10, 
NULL>`, `<1, NULL, NULL, 'This is a book'>` and `<1, 25.2, NULL, NULL>`, where 
the first column is the primary key. The final result will be `<1, 25.2, 10, 
'This is a book'>`.
+
+NOTE: For streaming queries, `partial-update` merge engine must be used 
together with `full-compaction` [changelog producer]({{< ref 
"docs/features/table-types#changelog-producers" >}}).
+
+ Aggregation
+
+Sometimes users only care about aggregated results. The `aggregation` merge 
engine aggregates each value field with the latest data one by one under the 
same primary key according to the aggregate function.
+
+Each field not part of the primary keys must be given an aggregate function, 
specified by the `fields..aggregate-function` table property. For 
example, consider the following table definition.
+
+{{< tabs "aggregation-merge-engine-example" >}}
+
+{{< tab "Flink" >}}
+
+```sql
+CREATE TABLE MyTable (
+product_id BIGINT,
+price DOUBLE,
+sales BIGINT,
+PRIMARY KEY (product_id) NOT ENFORCED
+) WITH (
+'merge-engine' = 'aggregation',
+'fields.price.aggregate-function' = 'max',
+'fields.sales.aggregate-function' = 'sum'
+);
+```
+
+{{< /tab >}}
+
+{{< /tabs >}}
+
+Field `price` will be aggregated by the `max` function, and field `sales` will 
be aggregated by the `sum` function. Given two input records `<1, 23.0, 15>` 
and `<1, 30.2, 20>`, the final result will be `<1, 30.2, 35>`.
+
+Current supported aggregate functions are data types are:
+
+* `sum`: supports DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT and 
DOUBLE.
+* `min`/`max`: support DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, 
DOUBLE, DATE, TIME, TIMESTAMP and TIMESTAMP_LTZ.
+* `last_value` / `last_non_null_value`: support all data types.
+* `listagg`: supports STRING data type.
+* `bool_and` / `bool_or`: support BOOLEAN data type.
+
+### Changelog Producers
+
+Streaming queries will continuously produce latest changes. These changes can 
come from the underlying table files or from an [external log system]({{< ref 
"docs/features/external-log-systems" >}}) like Kafka. Compared to the external 
log system, changes from table files have lower cost but higher latency 
(depending on how often snapshots are created).
+
+By specifying the `changelog-producer` table property when creating the table, 
users can choose the pattern of changes produced from files.
+
+ None
+
+By default, no extra changelog producer will be applied to the writer of 
table. Table Store source can only see the merged changes across snapshots, 
like what keys are removed and what are the new values of some keys.
+
+However, these merged changes cannot form a complete changelog, because we 
can't read the old values of the keys directly from them. Merged changes 
require the consumers to "remember" the values of each key and to rewrite the 
values without seeing the old ones.

Review Comment:
   

[GitHub] [flink-table-store] tsreaper commented on a diff in pull request #443: [FLINK-30458] Refactor Table Store Documentation

2022-12-20 Thread GitBox


tsreaper commented on code in PR #443:
URL: https://github.com/apache/flink-table-store/pull/443#discussion_r1053934969


##
docs/content/docs/sql-api/writing-tables.md:
##
@@ -0,0 +1,79 @@
+---
+title: "Writing Tables"
+weight: 4
+type: docs
+aliases:
+- /sql-api/writing-tables.html
+---
+
+
+# Writing Tables
+
+## Applying Records/Changes to Tables
+
+{{< tabs "insert-into-example" >}}
+
+{{< tab "Flink" >}}
+
+Use `INSERT INTO` to apply records and changes to tables.

Review Comment:
   `changelog-mode` only affects external log systems like Kafka. There must be 
`before` values from the input.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] (FLINK-15325) Input location preference which affects task distribution may make certain job performance worse

2022-12-20 Thread Zhu Zhu (Jira)


[ https://issues.apache.org/jira/browse/FLINK-15325 ]


Zhu Zhu deleted comment on FLINK-15325:
-

was (Author: flink-jira-bot):
This issue was labeled "stale-minor" 7 days ago and has not received any 
updates so it is being deprioritized. If this ticket is actually Minor, please 
raise the priority and ask a committer to assign you the issue or revive the 
public discussion.


> Input location preference which affects task distribution may make certain 
> job performance worse 
> -
>
> Key: FLINK-15325
> URL: https://issues.apache.org/jira/browse/FLINK-15325
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0
>Reporter: Zhu Zhu
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor
> Attachments: D58ADB03-7187-46B1-B077-91E5005FD463.png
>
>
> When running TPC-DS jobs in a session cluster, we observed that sometimes 
> tasks are not evenly distributed in TMs. The root cause turned out to be that 
> the downstream tasks tend to be TM or host local with its input tasks. This 
> helps to reduce network shuffle. 
> However, in certain cases, like the topology presented in the attached image, 
> jamming the input task's TM and machine with downstream tasks would affect 
> the performance. In this case, respecting input location preferences is 
> causing troubles more than bringing benefits.
> So I'm wondering whether we should introduce a config so that users can 
> disable input location preferences?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (FLINK-15325) Input location preference which affects task distribution may make certain job performance worse

2022-12-20 Thread Zhu Zhu (Jira)


[ https://issues.apache.org/jira/browse/FLINK-15325 ]


Zhu Zhu deleted comment on FLINK-15325:
-

was (Author: flink-jira-bot):
This issue was labeled "stale-major" 7 ago and has not received any updates so 
it is being deprioritized. If this ticket is actually Major, please raise the 
priority and ask a committer to assign you the issue or revive the public 
discussion.


> Input location preference which affects task distribution may make certain 
> job performance worse 
> -
>
> Key: FLINK-15325
> URL: https://issues.apache.org/jira/browse/FLINK-15325
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0
>Reporter: Zhu Zhu
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor
> Attachments: D58ADB03-7187-46B1-B077-91E5005FD463.png
>
>
> When running TPC-DS jobs in a session cluster, we observed that sometimes 
> tasks are not evenly distributed in TMs. The root cause turned out to be that 
> the downstream tasks tend to be TM or host local with its input tasks. This 
> helps to reduce network shuffle. 
> However, in certain cases, like the topology presented in the attached image, 
> jamming the input task's TM and machine with downstream tasks would affect 
> the performance. In this case, respecting input location preferences is 
> causing troubles more than bringing benefits.
> So I'm wondering whether we should introduce a config so that users can 
> disable input location preferences?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (FLINK-15325) Input location preference which affects task distribution may make certain job performance worse

2022-12-20 Thread Zhu Zhu (Jira)


[ https://issues.apache.org/jira/browse/FLINK-15325 ]


Zhu Zhu deleted comment on FLINK-15325:
-

was (Author: flink-jira-bot):
This major issue is unassigned and itself and all of its Sub-Tasks have not 
been updated for 30 days. So, it has been labeled "stale-major". If this ticket 
is indeed "major", please either assign yourself or give an update. Afterwards, 
please remove the label. In 7 days the issue will be deprioritized.

> Input location preference which affects task distribution may make certain 
> job performance worse 
> -
>
> Key: FLINK-15325
> URL: https://issues.apache.org/jira/browse/FLINK-15325
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0
>Reporter: Zhu Zhu
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor
> Attachments: D58ADB03-7187-46B1-B077-91E5005FD463.png
>
>
> When running TPC-DS jobs in a session cluster, we observed that sometimes 
> tasks are not evenly distributed in TMs. The root cause turned out to be that 
> the downstream tasks tend to be TM or host local with its input tasks. This 
> helps to reduce network shuffle. 
> However, in certain cases, like the topology presented in the attached image, 
> jamming the input task's TM and machine with downstream tasks would affect 
> the performance. In this case, respecting input location preferences is 
> causing troubles more than bringing benefits.
> So I'm wondering whether we should introduce a config so that users can 
> disable input location preferences?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (FLINK-15325) Input location preference which affects task distribution may make certain job performance worse

2022-12-20 Thread Zhu Zhu (Jira)


[ https://issues.apache.org/jira/browse/FLINK-15325 ]


Zhu Zhu deleted comment on FLINK-15325:
-

was (Author: flink-jira-bot):
I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Minor but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 180 days. I have gone ahead and marked it "stale-minor". If this ticket is 
still Minor, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> Input location preference which affects task distribution may make certain 
> job performance worse 
> -
>
> Key: FLINK-15325
> URL: https://issues.apache.org/jira/browse/FLINK-15325
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0
>Reporter: Zhu Zhu
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor
> Attachments: D58ADB03-7187-46B1-B077-91E5005FD463.png
>
>
> When running TPC-DS jobs in a session cluster, we observed that sometimes 
> tasks are not evenly distributed in TMs. The root cause turned out to be that 
> the downstream tasks tend to be TM or host local with its input tasks. This 
> helps to reduce network shuffle. 
> However, in certain cases, like the topology presented in the attached image, 
> jamming the input task's TM and machine with downstream tasks would affect 
> the performance. In this case, respecting input location preferences is 
> causing troubles more than bringing benefits.
> So I'm wondering whether we should introduce a config so that users can 
> disable input location preferences?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-table-store] tsreaper commented on a diff in pull request #443: [FLINK-30458] Refactor Table Store Documentation

2022-12-20 Thread GitBox


tsreaper commented on code in PR #443:
URL: https://github.com/apache/flink-table-store/pull/443#discussion_r1053934427


##
docs/content/docs/sql-api/writing-tables.md:
##
@@ -0,0 +1,79 @@
+---
+title: "Writing Tables"
+weight: 4
+type: docs
+aliases:
+- /sql-api/writing-tables.html
+---
+
+
+# Writing Tables
+
+## Applying Records/Changes to Tables
+
+{{< tabs "insert-into-example" >}}
+
+{{< tab "Flink" >}}
+
+Use `INSERT INTO` to apply records and changes to tables.
+
+```sql
+INSERT INTO MyTable SELECT ...
+```
+
+{{< /tab >}}
+
+{{< /tabs >}}
+
+## Overwriting the Whole Table

Review Comment:
   It means removing all records from the table / partition and then add the 
given records.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] tsreaper commented on a diff in pull request #443: [FLINK-30458] Refactor Table Store Documentation

2022-12-20 Thread GitBox


tsreaper commented on code in PR #443:
URL: https://github.com/apache/flink-table-store/pull/443#discussion_r1053934087


##
docs/content/docs/maintenance-actions/write-performance.md:
##
@@ -0,0 +1,118 @@
+---
+title: "Write Performance"
+weight: 1
+type: docs
+aliases:
+- /maintenance-actions/write-performance.html
+---
+
+
+# Write Performance
+
+Performance of Table Store writers are related with the following factors.
+
+## Parallelism
+
+It is recommended that the parallelism of sink should be less than or equal to 
the number of buckets, preferably equal. You can control the parallelism of the 
sink with the `sink.parallelism` table property.
+
+
+
+
+  Option
+  Required
+  Default
+  Type
+  Description
+
+
+
+
+  sink.parallelism
+  No
+  (none)
+  Integer
+  Defines the parallelism of the sink operator. By default, the 
parallelism is determined by the framework using the same parallelism of the 
upstream chained operator.
+
+
+
+
+## Number of Sorted Runs to Trigger Compaction
+
+Table Store uses LSM which supports a large number of updates. LSM organizes 
files in several "sorted runs". When querying records from an LSM, all sorted 
runs must be combined to produce a complete view of all records.

Review Comment:
   I guess I'll need to add a separate document to introduce all these concepts.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] tsreaper commented on a diff in pull request #443: [FLINK-30458] Refactor Table Store Documentation

2022-12-20 Thread GitBox


tsreaper commented on code in PR #443:
URL: https://github.com/apache/flink-table-store/pull/443#discussion_r1053932615


##
docs/content/docs/features/table-types.md:
##
@@ -0,0 +1,142 @@
+---
+title: "Table Types"
+weight: 1
+type: docs
+aliases:
+- /features/table-types.html
+---
+
+
+# Table Types
+
+Table Store supports various types of tables. Users can specify `write-mode` 
table property to specify table types when creating tables.
+
+## Changelog Tables with Primary Keys
+
+Changelog table is the default table type when creating a table. Users can 
also specify `'write-mode' = 'change-log'` explicitly in table properties when 
creating the table.
+
+Primary keys are a set of columns that are unique for each record. Table Store 
imposes an ordering of data, which means the system will sort the primary key 
within each bucket. Using this feature, users can achieve high performance by 
adding filter conditions on the primary key.
+
+By [defining primary keys]({{< ref 
"docs/sql-api/creating-tables#tables-with-primary-keys" >}}) on a changelog 
table, users can access the following features.
+
+### Merge Engines
+
+When Table Store sink receives two or more records with the same primary keys, 
it will merge them into one record to keep primary keys unique. By specifying 
the `merge-engine` table property, users can choose how records are merged 
together.
+
+ Deduplicate
+
+`deduplicate` merge engine is the default merge engine. Table Store will only 
keep the latest record and throw away other records with the same primary keys.
+
+Specifically, if the latest record is a `DELETE` record, all records with the 
same primary keys will be deleted.
+
+ Partial Update
+
+By specifying `'merge-engine' = 'partial-update'`, users can set columns of a 
record across multiple updates and finally get a complete record. Specifically, 
value fields are updated to the latest data one by one under the same primary 
key, but null values are not overwritten.
+
+For example, let's say Table Store receives three records `<1, 23.0, 10, 
NULL>`, `<1, NULL, NULL, 'This is a book'>` and `<1, 25.2, NULL, NULL>`, where 
the first column is the primary key. The final result will be `<1, 25.2, 10, 
'This is a book'>`.
+
+NOTE: For streaming queries, `partial-update` merge engine must be used 
together with `full-compaction` [changelog producer]({{< ref 
"docs/features/table-types#changelog-producers" >}}).
+
+ Aggregation
+
+Sometimes users only care about aggregated results. The `aggregation` merge 
engine aggregates each value field with the latest data one by one under the 
same primary key according to the aggregate function.
+
+Each field not part of the primary keys must be given an aggregate function, 
specified by the `fields..aggregate-function` table property. For 
example, consider the following table definition.
+
+{{< tabs "aggregation-merge-engine-example" >}}
+
+{{< tab "Flink" >}}
+
+```sql
+CREATE TABLE MyTable (
+product_id BIGINT,
+price DOUBLE,
+sales BIGINT,
+PRIMARY KEY (product_id) NOT ENFORCED
+) WITH (
+'merge-engine' = 'aggregation',
+'fields.price.aggregate-function' = 'max',
+'fields.sales.aggregate-function' = 'sum'
+);
+```
+
+{{< /tab >}}
+
+{{< /tabs >}}
+
+Field `price` will be aggregated by the `max` function, and field `sales` will 
be aggregated by the `sum` function. Given two input records `<1, 23.0, 15>` 
and `<1, 30.2, 20>`, the final result will be `<1, 30.2, 35>`.
+
+Current supported aggregate functions are data types are:
+
+* `sum`: supports DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT and 
DOUBLE.
+* `min`/`max`: support DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, 
DOUBLE, DATE, TIME, TIMESTAMP and TIMESTAMP_LTZ.
+* `last_value` / `last_non_null_value`: support all data types.
+* `listagg`: supports STRING data type.
+* `bool_and` / `bool_or`: support BOOLEAN data type.
+
+### Changelog Producers
+
+Streaming queries will continuously produce latest changes. These changes can 
come from the underlying table files or from an [external log system]({{< ref 
"docs/features/external-log-systems" >}}) like Kafka. Compared to the external 
log system, changes from table files have lower cost but higher latency 
(depending on how often snapshots are created).
+
+By specifying the `changelog-producer` table property when creating the table, 
users can choose the pattern of changes produced from files.

Review Comment:
   It does not affect Kafka. Kafka always stores the complete input records 
just like the `input` changelog producer. I'll make it clear.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-27640) Flink not compiling, flink-connector-hive_2.12 is missing jhyde pentaho-aggdesigner-algorithm:jar:5.1.5-jhyde

2022-12-20 Thread luoyuxia (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27640?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17650050#comment-17650050
 ] 

luoyuxia edited comment on FLINK-27640 at 12/21/22 2:41 AM:


I think it can safely be excluded since 
org.pentaho:pentaho-aggdesigner-algorithm is just a third level transitive 
dependency (hive-exec - > calcite-core -> pentaho-aggdesigner-algorithm) and I 
think we won't need it.


was (Author: luoyuxia):
I think it can safely be excluded since 
org.pentaho:pentaho-aggdesigner-algorithm is just a third level transitive 
dependency (hive-exec - > calcite-core -> pentaho-aggdesigner-algorithm) and I 
think we won't need it.

> Flink not compiling, flink-connector-hive_2.12 is missing jhyde 
> pentaho-aggdesigner-algorithm:jar:5.1.5-jhyde 
> --
>
> Key: FLINK-27640
> URL: https://issues.apache.org/jira/browse/FLINK-27640
> Project: Flink
>  Issue Type: Bug
>  Components: Build System, Connectors / Hive
>Affects Versions: 1.16.0
>Reporter: Piotr Nowojski
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available, stale-assigned
>
> When clean installing whole project after cleaning local {{.m2}} directory I 
> encountered the following error when compiling flink-connector-hive_2.12:
> {noformat}
> [ERROR] Failed to execute goal on project flink-connector-hive_2.12: Could 
> not resolve dependencies for project 
> org.apache.flink:flink-connector-hive_2.12:jar:1.16-SNAPSHOT: Failed to 
> collect dependencies at org.apache.hive:hive-exec:jar:2.3.9 -> 
> org.pentaho:pentaho-aggdesigner-algorithm:jar:5.1.5-jhyde: Failed to read 
> artifact descriptor for 
> org.pentaho:pentaho-aggdesigner-algorithm:jar:5.1.5-jhyde: Could not transfer 
> artifact org.pentaho:pentaho-aggdesigner-algorithm:pom:5.1.5-jhyde from/to 
> maven-default-http-blocker (http://0.0.0.0/): Blocked mirror for 
> repositories: [conjars (http://conjars.org/repo, default, 
> releases+snapshots), apache.snapshots 
> (http://repository.apache.org/snapshots, default, snapshots)] -> [Help 1]
> {noformat}
> I've solved this by adding 
> {noformat}
> 
> spring-repo-plugins
> https://repo.spring.io/ui/native/plugins-release/
> 
> {noformat}
> to ~/.m2/settings.xml file. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] chucheng92 commented on pull request #21464: [FLINK-30334][runtime] Fix SourceCoordinator#handleRequestSplitEvent hasNoMoreSplits check not consider the hybridsource situation

2022-12-20 Thread GitBox


chucheng92 commented on PR #21464:
URL: https://github.com/apache/flink/pull/21464#issuecomment-1360734949

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] xintongsong commented on a diff in pull request #21347: [FLINK-29640][Client/Job Submission]Enhance the function configured by execution.shutdown-on-attached-exi…

2022-12-20 Thread GitBox


xintongsong commented on code in PR #21347:
URL: https://github.com/apache/flink/pull/21347#discussion_r1053915899


##
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java:
##
@@ -1344,6 +1344,13 @@ public CompletableFuture> 
listCompletedClusterDatasetIds() {
 metaInfoMap -> new 
HashSet<>(metaInfoMap.keySet(;
 }
 
+public CompletableFuture reportHeartbeat(JobID jobId, long 
expiredTimestamp) {
+return runDispatcherCommand(
+dispatcherGateway ->
+dispatcherGateway.reportJobClientHeartbeat(
+jobId, expiredTimestamp, rpcTimeout));
+}

Review Comment:
   I'm fine with keep it if that helps the testing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] fsk119 commented on pull request #21187: [FLINK-27837][sql-gateway] Support statement set in the SQL Gateway

2022-12-20 Thread GitBox


fsk119 commented on PR #21187:
URL: https://github.com/apache/flink/pull/21187#issuecomment-1360718894

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] xintongsong commented on a diff in pull request #21347: [FLINK-29640][Client/Job Submission]Enhance the function configured by execution.shutdown-on-attached-exi…

2022-12-20 Thread GitBox


xintongsong commented on code in PR #21347:
URL: https://github.com/apache/flink/pull/21347#discussion_r1053913714


##
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobClientHeartbeatHandler.java:
##
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobClientHeartbeatParameters;
+import org.apache.flink.runtime.rest.messages.JobClientHeartbeatRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/** Receive the heartbeat from the client. */
+public class JobClientHeartbeatHandler
+extends AbstractRestHandler<
+RestfulGateway,
+JobClientHeartbeatRequestBody,
+EmptyResponseBody,
+JobClientHeartbeatParameters> {
+public JobClientHeartbeatHandler(
+GatewayRetriever leaderRetriever,
+Time timeout,
+Map headers,
+MessageHeaders<
+JobClientHeartbeatRequestBody,
+EmptyResponseBody,
+JobClientHeartbeatParameters>
+messageHeaders) {
+super(leaderRetriever, timeout, headers, messageHeaders);
+}
+
+@Override
+public CompletableFuture handleRequest(
+HandlerRequest request, 
RestfulGateway gateway)
+throws RestHandlerException {
+final JobID jobId = request.getPathParameter(JobIDPathParameter.class);
+gateway.reportJobClientHeartbeat(
+jobId, request.getRequestBody().getExpiredTimestamp(), 
timeout);

Review Comment:
   We probably don't need to return the CompletableFuture to the client side, 
but the rest handler should check the future returned from the gateway, and 
handle the exception if the future is completed exceptionally.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] 1996fanrui commented on a diff in pull request #21447: [FLINK-30185] Provide the flame graph to the subtask level

2022-12-20 Thread GitBox


1996fanrui commented on code in PR #21447:
URL: https://github.com/apache/flink/pull/21447#discussion_r1053913594


##
flink-runtime/src/main/java/org/apache/flink/runtime/messages/ThreadInfoSample.java:
##
@@ -32,25 +36,33 @@
  */
 public class ThreadInfoSample implements Serializable {
 
+private final ExecutionAttemptID executionId;

Review Comment:
   Thanks for your quick feedback, I will refactor it asap.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] vacaly commented on a diff in pull request #192: [FLINK-30451] Add Estimator and Transformer for Swing

2022-12-20 Thread GitBox


vacaly commented on code in PR #192:
URL: https://github.com/apache/flink-ml/pull/192#discussion_r1053913128


##
flink-ml-lib/src/main/java/org/apache/flink/ml/recommendation/Swing.java:
##
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.recommendation;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.iteration.operator.OperatorStateUtils;
+import org.apache.flink.ml.api.Estimator;
+import org.apache.flink.ml.clustering.kmeans.KMeans;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.runtime.typeutils.TypeCheckUtils;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.Row;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * An Estimator which implements the Swing algorithm.
+ *
+ * Swing is an item recall model. The topology of user-item graph usually 
can be described as user-item-user or
+ * item-user-item, which are like 'swing'. For example, if both user 
u and user v have purchased the
+ * same commodity i , they will form a relationship diagram similar 
to a swing. If u and v
+ * have purchased commodity j in addition to i, it is 
supposed i and j are
+ * similar.
+ */
+public class Swing implements Estimator , SwingParams 
 {
+   private final Map , Object> paramMap = new HashMap <>();
+
+   public Swing() {
+   ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+   }
+
+   @Override
+   public SwingModel fit(Table... inputs) {
+
+   final String userCol = getUserCol();
+   final String itemCol = getItemCol();
+   final int minUserItems = getMinUserItems();
+   final int maxUserItems = getMaxUserItems();
+   final ResolvedSchema schema = ((TableImpl) 
inputs[0]).getResolvedSchema();
+   final LogicalType userColType = 
schema.getColumn(userCol).get().getDataType().getLogicalType();
+   final LogicalType itemColType = 
schema.getColumn(itemCol).get().getDataType().getLogicalType();
+
+   if 
(!TypeCheckUtils.isCharacterString(InternalTypeInfo.of(userColType).toLogicalType())
 ||
+   
!TypeCheckUtils.isCharacterString(InternalTypeInfo.of(itemColType).toLogicalType()))
 {
+   throw new IllegalArgumentException("Type of user and 
item column must be string.");
+   }
+
+   StreamTableEnvironment tEnv =
+   (StreamTableEnvironment) ((TableImpl) inputs[0])
+   .getTableEnvironment();
+
+   SingleOutputStreamOperator > itemUsers =
+   tEnv.toDataStream(inputs[0])
+   .map(row -> Tuple2.of((String) 
row.getFieldAs(userCol), (String) 

[GitHub] [flink-kubernetes-operator] tweise commented on a diff in pull request #489: [FLINK-30406] Detect when jobmanager never started

2022-12-20 Thread GitBox


tweise commented on code in PR #489:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/489#discussion_r1053912124


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java:
##
@@ -116,9 +119,23 @@ public final void reconcile(CR cr, Context ctx) throws 
Exception {
 if (reconciliationStatus.isBeforeFirstDeployment()) {
 LOG.info("Deploying for the first time");
 
+if (spec.getJob() != null) {
+var initialUpgradeMode = UpgradeMode.STATELESS;
+var initialSp = spec.getJob().getInitialSavepointPath();
+
+if (initialSp != null) {
+status.getJobStatus()
+.getSavepointInfo()
+.setLastSavepoint(
+Savepoint.of(initialSp, 
SavepointTriggerType.UNKNOWN));
+initialUpgradeMode = UpgradeMode.SAVEPOINT;
+}
+
+spec.getJob().setUpgradeMode(initialUpgradeMode);

Review Comment:
   Makes sense. Agreed that it must be possible to find the actually used 
upgrade mode. And since upgrade mode change does not cause reconciliation there 
is no need to consider it when comparing the specs from user side either. That 
a new mode was applied is evident by its presence in the latest spec.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] xintongsong commented on a diff in pull request #21447: [FLINK-30185] Provide the flame graph to the subtask level

2022-12-20 Thread GitBox


xintongsong commented on code in PR #21447:
URL: https://github.com/apache/flink/pull/21447#discussion_r1053911284


##
flink-runtime/src/main/java/org/apache/flink/runtime/messages/ThreadInfoSample.java:
##
@@ -32,25 +36,33 @@
  */
 public class ThreadInfoSample implements Serializable {
 
+private final ExecutionAttemptID executionId;

Review Comment:
   I don't think generic type solves the problem here, because the concept 
**owner** itself is not general.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-27940) [JUnit5 Migration] Module: flink-connector-jdbc

2022-12-20 Thread Lijie Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27940?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lijie Wang reassigned FLINK-27940:
--

Assignee: João Boto  (was: Lijie Wang)

> [JUnit5 Migration] Module: flink-connector-jdbc
> ---
>
> Key: FLINK-27940
> URL: https://issues.apache.org/jira/browse/FLINK-27940
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / JDBC
>Affects Versions: 1.16.0
>Reporter: Alexander Preuss
>Assignee: João Boto
>Priority: Minor
>  Labels: pull-request-available, starter
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-jdbc] wanglijie95 commented on pull request #12: [FLINK-27940] Migrate tests to junit5

2022-12-20 Thread GitBox


wanglijie95 commented on PR #12:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/12#issuecomment-1360686642

   @eskabetxe Please feel free to take it. I will assign it to you. And I can 
help to review this pr :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] lincoln-lil commented on a diff in pull request #21219: [FLINK-29849][table-planner] Fix event time temporal join on an upsert source may produce incorrect execution plan

2022-12-20 Thread GitBox


lincoln-lil commented on code in PR #21219:
URL: https://github.com/apache/flink/pull/21219#discussion_r1053906856


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/EventTimeTemporalJoinRewriteRule.java:
##
@@ -0,0 +1,477 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalJoin;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalRel;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalSnapshot;
+import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableSourceScan;
+import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalWatermarkAssigner;
+import org.apache.flink.table.planner.plan.schema.TimeIndicatorRelDataType;
+import org.apache.flink.table.planner.plan.utils.TemporalTableJoinUtil;
+
+import 
org.apache.flink.shaded.curator5.org.apache.curator.shaded.com.google.common.collect.Lists;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.plan.hep.HepRelVertex;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.tools.RuleSet;
+import org.apache.calcite.tools.RuleSets;
+
+/**
+ * Traverses an event time temporal table join {@link RelNode} tree and update 
the right child to
+ * set {@link FlinkLogicalTableSourceScan}'s eventTimeSnapshot property to 
true which will prevent
+ * it generating a new StreamPhysicalChangelogNormalize later.
+ *
+ * the match patterns are as following(8 variants, the three `Calc` nodes 
are all optional):
+ *
+ * {@code
+ *Join (event time temporal)
+ *  /   \
+ * RelNode [Calc]
+ *   \
+ * Snapshot
+ *\
+ *  [Calc]
+ * \
+ * WatermarkAssigner
+ *  \
+ *[Calc]
+ *   \
+ *TableScan
+ * }
+ *
+ * Note: This rule can only be used in a separate {@link 
org.apache.calcite.plan.hep.HepProgram}
+ * after `LOGICAL_REWRITE` rule sets are applied for now.
+ */
+public class EventTimeTemporalJoinRewriteRule
+extends RelRule {
+
+public static final RuleSet EVENT_TIME_TEMPORAL_JOIN_REWRITE_RULES =
+RuleSets.ofList(
+Config.JOIN_CALC_SNAPSHOT_CALC_WMA_CALC_TS.toRule(),
+Config.JOIN_CALC_SNAPSHOT_CALC_WMA_TS.toRule(),
+Config.JOIN_CALC_SNAPSHOT_WMA_CALC_TS.toRule(),
+Config.JOIN_CALC_SNAPSHOT_WMA_TS.toRule(),
+Config.JOIN_SNAPSHOT_CALC_WMA_CALC_TS.toRule(),
+Config.JOIN_SNAPSHOT_CALC_WMA_TS.toRule(),
+Config.JOIN_SNAPSHOT_WMA_CALC_TS.toRule(),
+Config.JOIN_SNAPSHOT_WMA_TS.toRule());
+
+public EventTimeTemporalJoinRewriteRule(Config config) {
+super(config);
+}
+
+@Override
+public boolean matches(RelOptRuleCall call) {
+FlinkLogicalJoin join = call.rel(0);
+RexNode joinCondition = join.getCondition();
+// only matches event time temporal join
+return joinCondition != null
+&& 
TemporalTableJoinUtil.isEventTimeTemporalJoin(joinCondition);
+}
+
+@Override
+public void onMatch(RelOptRuleCall call) {
+FlinkLogicalJoin join = call.rel(0);
+FlinkLogicalRel joinRightChild = call.rel(2);
+RelNode newRight = transmitSnapshotRequirement(joinRightChild);
+call.transformTo(
+join.copy(join.getTraitSet(), 
Lists.newArrayList(join.getLeft(), newRight)));
+}
+
+private RelNode transmitSnapshotRequirement(RelNode node) {
+if (node instanceof FlinkLogicalCalc) {
+final FlinkLogicalCalc calc = (FlinkLogicalCalc) node;
+// filter is not allowed 

[GitHub] [flink] dianfu closed pull request #21254: Update docker.md

2022-12-20 Thread GitBox


dianfu closed pull request #21254: Update docker.md
URL: https://github.com/apache/flink/pull/21254


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-27640) Flink not compiling, flink-connector-hive_2.12 is missing jhyde pentaho-aggdesigner-algorithm:jar:5.1.5-jhyde

2022-12-20 Thread luoyuxia (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27640?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17650050#comment-17650050
 ] 

luoyuxia commented on FLINK-27640:
--

I think it can safely be excluded since 
org.pentaho:pentaho-aggdesigner-algorithm is just a third level transitive 
dependency (hive-exec - > calcite-core -> pentaho-aggdesigner-algorithm) and I 
think we won't need it.

> Flink not compiling, flink-connector-hive_2.12 is missing jhyde 
> pentaho-aggdesigner-algorithm:jar:5.1.5-jhyde 
> --
>
> Key: FLINK-27640
> URL: https://issues.apache.org/jira/browse/FLINK-27640
> Project: Flink
>  Issue Type: Bug
>  Components: Build System, Connectors / Hive
>Affects Versions: 1.16.0
>Reporter: Piotr Nowojski
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available, stale-assigned
>
> When clean installing whole project after cleaning local {{.m2}} directory I 
> encountered the following error when compiling flink-connector-hive_2.12:
> {noformat}
> [ERROR] Failed to execute goal on project flink-connector-hive_2.12: Could 
> not resolve dependencies for project 
> org.apache.flink:flink-connector-hive_2.12:jar:1.16-SNAPSHOT: Failed to 
> collect dependencies at org.apache.hive:hive-exec:jar:2.3.9 -> 
> org.pentaho:pentaho-aggdesigner-algorithm:jar:5.1.5-jhyde: Failed to read 
> artifact descriptor for 
> org.pentaho:pentaho-aggdesigner-algorithm:jar:5.1.5-jhyde: Could not transfer 
> artifact org.pentaho:pentaho-aggdesigner-algorithm:pom:5.1.5-jhyde from/to 
> maven-default-http-blocker (http://0.0.0.0/): Blocked mirror for 
> repositories: [conjars (http://conjars.org/repo, default, 
> releases+snapshots), apache.snapshots 
> (http://repository.apache.org/snapshots, default, snapshots)] -> [Help 1]
> {noformat}
> I've solved this by adding 
> {noformat}
> 
> spring-repo-plugins
> https://repo.spring.io/ui/native/plugins-release/
> 
> {noformat}
> to ~/.m2/settings.xml file. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] dianfu commented on pull request #21254: Update docker.md

2022-12-20 Thread GitBox


dianfu commented on PR #21254:
URL: https://github.com/apache/flink/pull/21254#issuecomment-1360669458

   Sorry for late response. I have missed this PR. The PR LGTM and I guess it 
should be caused by the bump of fastavro version which is introduced in 
https://issues.apache.org/jira/browse/FLINK-27058.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] vacaly commented on a diff in pull request #192: [FLINK-30451] Add Estimator and Transformer for Swing

2022-12-20 Thread GitBox


vacaly commented on code in PR #192:
URL: https://github.com/apache/flink-ml/pull/192#discussion_r1053893147


##
flink-ml-lib/src/main/java/org/apache/flink/ml/recommendation/SwingModel.java:
##
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.recommendation;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.base.FloatSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.reader.SimpleStreamFormat;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.ml.api.Model;
+import org.apache.flink.ml.clustering.kmeans.KMeans;
+import org.apache.flink.ml.common.broadcast.BroadcastUtils;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.recommendation.SwingModelData.ModelDataDecoder;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A Model which recommends item for top-N similar items using the model data 
computed by {@link Swing} .
+ */
+public class SwingModel implements Model , SwingModelParams 
 {
+   private final Map , Object> paramMap = new HashMap <>();
+   private Table modelDataTable;
+   private final String SWING_ITEM_NAME = "SWING_ITEM_NAME";
+   private final String SWING_ITEM_RANK = "SWING_ITEM_RANK";
+   private final String SWING_ITEM_SCORE = "SWING_ITEM_SCORE";
+
+   public SwingModel() {
+   ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+   }
+
+   @Override
+   public SwingModel setModelData(Table... inputs) {
+   modelDataTable = inputs[0];
+   return this;
+   }
+
+   @Override
+   public Table[] getModelData() {
+   return new Table[] {modelDataTable};
+   }
+
+   @Override
+   public Table[] transform(Table... inputs) {
+   Preconditions.checkArgument(inputs.length == 1);
+   final String predictItemCol = getItemCol();
+   final String broadcastModelKey = "broadcastModelKey";
+
+   StreamTableEnvironment tEnv =
+   (StreamTableEnvironment) ((TableImpl) 
inputs[0]).getTableEnvironment();
+   DataStream  data = tEnv.toDataStream(inputs[0]);
+   DataStream  model 
=SwingModelData.getDataStream(tEnv,modelDataTable);
+
+   RowTypeInfo inputTypeInfo = 
TableUtils.getRowTypeInfo(inputs[0].getResolvedSchema());
+   RowTypeInfo outputTypeInfo =
+   new RowTypeInfo(
+   ArrayUtils.addAll(
+   inputTypeInfo.getFieldTypes(),
+   BasicTypeInfo.STRING_TYPE_INFO,
+   BasicTypeInfo.INT_TYPE_INFO,
+ 

[jira] [Created] (FLINK-30467) Improve the accuracy of estimating the row count of LocalHashAgg to make more two-phase HashAgg operators effective

2022-12-20 Thread Yunhong Zheng (Jira)
Yunhong Zheng created FLINK-30467:
-

 Summary: Improve the accuracy of estimating the row count of 
LocalHashAgg to make more two-phase HashAgg operators effective 
 Key: FLINK-30467
 URL: https://issues.apache.org/jira/browse/FLINK-30467
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Affects Versions: 1.17.0
Reporter: Yunhong Zheng
 Fix For: 1.17.0


Improve the accuracy of estimating the row count of LocalHashAgg to make more 
two-phase HashAgg operators effective.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-30369) Fix FlinkFilterJoinRule push wrong filters to another side while join type is left/right outer join

2022-12-20 Thread Yunhong Zheng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yunhong Zheng closed FLINK-30369.
-
Resolution: Not A Bug

> Fix FlinkFilterJoinRule push wrong filters to another side while join type is 
> left/right outer join
> ---
>
> Key: FLINK-30369
> URL: https://issues.apache.org/jira/browse/FLINK-30369
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Affects Versions: 1.16.1
>Reporter: Yunhong Zheng
>Priority: Major
> Fix For: 1.17.0
>
>
> Now, if join type is left/right outer join,  FlinkFilterJoinRule will push 
> wrong filters to another side by extracting filters from join conditions. If 
> conditions contain IS_NULL or IS_NOT_NULL filters and join type is left/right 
> join, it cannot push this filter to another side.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-ml] vacaly commented on a diff in pull request #192: [FLINK-30451] Add Estimator and Transformer for Swing

2022-12-20 Thread GitBox


vacaly commented on code in PR #192:
URL: https://github.com/apache/flink-ml/pull/192#discussion_r1053890313


##
flink-ml-lib/src/main/java/org/apache/flink/ml/recommendation/Swing.java:
##
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.recommendation;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.iteration.operator.OperatorStateUtils;
+import org.apache.flink.ml.api.Estimator;
+import org.apache.flink.ml.clustering.kmeans.KMeans;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.runtime.typeutils.TypeCheckUtils;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.Row;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * An Estimator which implements the Swing algorithm.
+ *
+ * Swing is an item recall model. The topology of user-item graph usually 
can be described as user-item-user or
+ * item-user-item, which are like 'swing'. For example, if both user 
u and user v have purchased the
+ * same commodity i , they will form a relationship diagram similar 
to a swing. If u and v
+ * have purchased commodity j in addition to i, it is 
supposed i and j are
+ * similar.
+ */
+public class Swing implements Estimator , SwingParams 
 {
+   private final Map , Object> paramMap = new HashMap <>();
+
+   public Swing() {
+   ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+   }
+
+   @Override
+   public SwingModel fit(Table... inputs) {
+
+   final String userCol = getUserCol();
+   final String itemCol = getItemCol();
+   final int minUserItems = getMinUserItems();
+   final int maxUserItems = getMaxUserItems();
+   final ResolvedSchema schema = ((TableImpl) 
inputs[0]).getResolvedSchema();
+   final LogicalType userColType = 
schema.getColumn(userCol).get().getDataType().getLogicalType();
+   final LogicalType itemColType = 
schema.getColumn(itemCol).get().getDataType().getLogicalType();
+
+   if 
(!TypeCheckUtils.isCharacterString(InternalTypeInfo.of(userColType).toLogicalType())
 ||
+   
!TypeCheckUtils.isCharacterString(InternalTypeInfo.of(itemColType).toLogicalType()))
 {
+   throw new IllegalArgumentException("Type of user and 
item column must be string.");
+   }
+
+   StreamTableEnvironment tEnv =
+   (StreamTableEnvironment) ((TableImpl) inputs[0])
+   .getTableEnvironment();
+
+   SingleOutputStreamOperator > itemUsers =
+   tEnv.toDataStream(inputs[0])
+   .map(row -> Tuple2.of((String) 
row.getFieldAs(userCol), (String) 

[GitHub] [flink-ml] vacaly commented on a diff in pull request #192: [FLINK-30451] Add Estimator and Transformer for Swing

2022-12-20 Thread GitBox


vacaly commented on code in PR #192:
URL: https://github.com/apache/flink-ml/pull/192#discussion_r1053881499


##
flink-ml-lib/src/main/java/org/apache/flink/ml/recommendation/Swing.java:
##
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.recommendation;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.iteration.operator.OperatorStateUtils;
+import org.apache.flink.ml.api.Estimator;
+import org.apache.flink.ml.clustering.kmeans.KMeans;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.runtime.typeutils.TypeCheckUtils;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.Row;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * An Estimator which implements the Swing algorithm.
+ *
+ * Swing is an item recall model. The topology of user-item graph usually 
can be described as user-item-user or
+ * item-user-item, which are like 'swing'. For example, if both user 
u and user v have purchased the
+ * same commodity i , they will form a relationship diagram similar 
to a swing. If u and v
+ * have purchased commodity j in addition to i, it is 
supposed i and j are
+ * similar.
+ */
+public class Swing implements Estimator , SwingParams 
 {
+   private final Map , Object> paramMap = new HashMap <>();
+
+   public Swing() {
+   ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+   }
+
+   @Override
+   public SwingModel fit(Table... inputs) {
+
+   final String userCol = getUserCol();
+   final String itemCol = getItemCol();
+   final int minUserItems = getMinUserItems();
+   final int maxUserItems = getMaxUserItems();
+   final ResolvedSchema schema = ((TableImpl) 
inputs[0]).getResolvedSchema();
+   final LogicalType userColType = 
schema.getColumn(userCol).get().getDataType().getLogicalType();
+   final LogicalType itemColType = 
schema.getColumn(itemCol).get().getDataType().getLogicalType();
+
+   if 
(!TypeCheckUtils.isCharacterString(InternalTypeInfo.of(userColType).toLogicalType())
 ||
+   
!TypeCheckUtils.isCharacterString(InternalTypeInfo.of(itemColType).toLogicalType()))
 {
+   throw new IllegalArgumentException("Type of user and 
item column must be string.");
+   }
+
+   StreamTableEnvironment tEnv =
+   (StreamTableEnvironment) ((TableImpl) inputs[0])
+   .getTableEnvironment();
+
+   SingleOutputStreamOperator > itemUsers =
+   tEnv.toDataStream(inputs[0])
+   .map(row -> Tuple2.of((String) 
row.getFieldAs(userCol), (String) 

[GitHub] [flink-ml] vacaly commented on a diff in pull request #192: [FLINK-30451] Add Estimator and Transformer for Swing

2022-12-20 Thread GitBox


vacaly commented on code in PR #192:
URL: https://github.com/apache/flink-ml/pull/192#discussion_r1053881373


##
flink-ml-lib/src/main/java/org/apache/flink/ml/recommendation/Swing.java:
##
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.recommendation;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.iteration.operator.OperatorStateUtils;
+import org.apache.flink.ml.api.Estimator;
+import org.apache.flink.ml.clustering.kmeans.KMeans;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.runtime.typeutils.TypeCheckUtils;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.Row;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * An Estimator which implements the Swing algorithm.
+ *
+ * Swing is an item recall model. The topology of user-item graph usually 
can be described as user-item-user or
+ * item-user-item, which are like 'swing'. For example, if both user 
u and user v have purchased the
+ * same commodity i , they will form a relationship diagram similar 
to a swing. If u and v
+ * have purchased commodity j in addition to i, it is 
supposed i and j are
+ * similar.
+ */
+public class Swing implements Estimator , SwingParams 
 {
+   private final Map , Object> paramMap = new HashMap <>();
+
+   public Swing() {
+   ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+   }
+
+   @Override
+   public SwingModel fit(Table... inputs) {
+
+   final String userCol = getUserCol();
+   final String itemCol = getItemCol();
+   final int minUserItems = getMinUserItems();
+   final int maxUserItems = getMaxUserItems();
+   final ResolvedSchema schema = ((TableImpl) 
inputs[0]).getResolvedSchema();
+   final LogicalType userColType = 
schema.getColumn(userCol).get().getDataType().getLogicalType();
+   final LogicalType itemColType = 
schema.getColumn(itemCol).get().getDataType().getLogicalType();
+
+   if 
(!TypeCheckUtils.isCharacterString(InternalTypeInfo.of(userColType).toLogicalType())
 ||
+   
!TypeCheckUtils.isCharacterString(InternalTypeInfo.of(itemColType).toLogicalType()))
 {
+   throw new IllegalArgumentException("Type of user and 
item column must be string.");
+   }
+
+   StreamTableEnvironment tEnv =
+   (StreamTableEnvironment) ((TableImpl) inputs[0])
+   .getTableEnvironment();
+
+   SingleOutputStreamOperator > itemUsers =
+   tEnv.toDataStream(inputs[0])
+   .map(row -> Tuple2.of((String) 
row.getFieldAs(userCol), (String) 

[GitHub] [flink-kubernetes-operator] morhidi commented on a diff in pull request #489: [FLINK-30406] Detect when jobmanager never started

2022-12-20 Thread GitBox


morhidi commented on code in PR #489:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/489#discussion_r1053822550


##
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java:
##
@@ -260,11 +265,96 @@ private FlinkDeployment cloneDeploymentWithUpgradeMode(
 }
 
 @ParameterizedTest
-@EnumSource(UpgradeMode.class)
-public void testUpgradeBeforeReachingStableSpec(UpgradeMode upgradeMode) 
throws Exception {
+@MethodSource("testUpgradeJmDeployCannotStartParams")
+public void testUpgradeJmDeployCannotStart(UpgradeMode fromMode, 
UpgradeMode toMode)
+throws Exception {

Review Comment:
   Does it makes sense Gyula to break up this test into multiple methods? It's 
really hard to decode what are the internal status changes / checks mean 
actually.
   - succesfulJobSubmit
   - suspendWithSavepoint
   - etc..



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] tedhtchang commented on pull request #491: [FLINK-30456] Fixing Version and provider in OLM Description

2022-12-20 Thread GitBox


tedhtchang commented on PR #491:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/491#issuecomment-1360404403

   Tested on Kubernetes. LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] morhidi commented on pull request #491: [FLINK-30456] Fixing Version and provider in OLM Description

2022-12-20 Thread GitBox


morhidi commented on PR #491:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/491#issuecomment-1360393927

   +1 LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] morhidi commented on pull request #487: [FLINK-30411] Validate empty JmSpec and TmSpec

2022-12-20 Thread GitBox


morhidi commented on PR #487:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/487#issuecomment-1360392616

   @gyfora how do you think we should fix this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] tedhtchang commented on a diff in pull request #491: [FLINK-30456] Fixing Version and provider in OLM Description

2022-12-20 Thread GitBox


tedhtchang commented on code in PR #491:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/491#discussion_r1053794906


##
tools/olm/docker-entry.sh:
##
@@ -102,6 +96,10 @@ generate_olm_bundle() {
   yq ea -i ".spec.replaces = \"${PACKAGE_NAME}.v${PREVIOUS_BUNDLE_VERSION}\" | 
.spec.replaces style=\"\"" "${CSV_FILE}"
 
   yq ea -i "del(.subjects[0].namespace)" "${ROLE_BINDING}"
+
+  # Needed to replace description with new bundle values
+  sed -i "s/release-1.1/release-${BUNDLE_VERSION}/" "${CSV_FILE}"
+  sed -i "s/version: 1.2.0/version: ${BUNDLE_VERSION}/" "${CSV_FILE}"

Review Comment:
   @jbusche Links look good!
   
![image](https://user-images.githubusercontent.com/7155778/208775908-d5e02c9a-67f7-4e19-b487-59f127a725ce.png)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-30409) Support reopening closed metric groups

2022-12-20 Thread Mason Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17650008#comment-17650008
 ] 

Mason Chen commented on FLINK-30409:


[~xtsong] Thanks for assigning me the ticket. The PR is up with CI passing. I 
hope you see this message since I don't have permission to select reviewers :) 

> Support reopening closed metric groups
> --
>
> Key: FLINK-30409
> URL: https://issues.apache.org/jira/browse/FLINK-30409
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics
>Affects Versions: 1.17.0
>Reporter: Mason Chen
>Assignee: Mason Chen
>Priority: Major
>  Labels: pull-request-available
>
> Currently, metricGroup.close() will unregister metrics and the underlying 
> metric groups. If the metricGroup is created again via addGroup(), it will 
> silently fail to create metrics since the metric group is in a closed state.
> We need to close metric groups and reopen them because some of the metrics 
> may reference old objects that are no longer relevant/stale and we need to 
> re-create the metric/metric group to point to the new references. For 
> example, we may close `KafkaSourceReader` to remove a topic partition from 
> assignment and then recreate `KafkaSourceReader` with a different set of 
> topic partitions. The metrics should also reflect that.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-kubernetes-operator] jbusche commented on pull request #491: [FLINK-30456] Fixing Version and provider in OLM Description

2022-12-20 Thread GitBox


jbusche commented on PR #491:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/491#issuecomment-1360379235

   It's looking good - it added the links as expected, and is matching on the 
versions nicely.  
   Deployed well:
   ```
   oc get pods
   NAME  READY   
STATUS  RESTARTS   AGE
   0e60453bbf27f3b2271a54772da7acb269b807d208b10b221a80c740496p2ml   0/1 
Completed   0  6m33s
   basic-example-56876dc586-wwrbv1/1 
Running 0  4m50s
   basic-example-taskmanager-1-1 1/1 
Running 0  4m38s
   flink-kubernetes-operator-7dd84c798f-52d4k2/2 
Running 0  6m21s
   olm-flink-operator-catalog-9s9ch  1/1 
Running 0  7m8s
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] Gerrrr commented on a diff in pull request #443: [FLINK-30458] Refactor Table Store Documentation

2022-12-20 Thread GitBox


Ge commented on code in PR #443:
URL: https://github.com/apache/flink-table-store/pull/443#discussion_r1053789276


##
docs/content/docs/features/table-types.md:
##
@@ -0,0 +1,142 @@
+---
+title: "Table Types"
+weight: 1
+type: docs
+aliases:
+- /features/table-types.html
+---
+
+
+# Table Types
+
+Table Store supports various types of tables. Users can specify `write-mode` 
table property to specify table types when creating tables.
+
+## Changelog Tables with Primary Keys
+
+Changelog table is the default table type when creating a table. Users can 
also specify `'write-mode' = 'change-log'` explicitly in table properties when 
creating the table.
+
+Primary keys are a set of columns that are unique for each record. Table Store 
imposes an ordering of data, which means the system will sort the primary key 
within each bucket. Using this feature, users can achieve high performance by 
adding filter conditions on the primary key.
+
+By [defining primary keys]({{< ref 
"docs/sql-api/creating-tables#tables-with-primary-keys" >}}) on a changelog 
table, users can access the following features.
+
+### Merge Engines
+
+When Table Store sink receives two or more records with the same primary keys, 
it will merge them into one record to keep primary keys unique. By specifying 
the `merge-engine` table property, users can choose how records are merged 
together.
+
+ Deduplicate
+
+`deduplicate` merge engine is the default merge engine. Table Store will only 
keep the latest record and throw away other records with the same primary keys.
+
+Specifically, if the latest record is a `DELETE` record, all records with the 
same primary keys will be deleted.
+
+ Partial Update
+
+By specifying `'merge-engine' = 'partial-update'`, users can set columns of a 
record across multiple updates and finally get a complete record. Specifically, 
value fields are updated to the latest data one by one under the same primary 
key, but null values are not overwritten.
+
+For example, let's say Table Store receives three records `<1, 23.0, 10, 
NULL>`, `<1, NULL, NULL, 'This is a book'>` and `<1, 25.2, NULL, NULL>`, where 
the first column is the primary key. The final result will be `<1, 25.2, 10, 
'This is a book'>`.
+
+NOTE: For streaming queries, `partial-update` merge engine must be used 
together with `full-compaction` [changelog producer]({{< ref 
"docs/features/table-types#changelog-producers" >}}).
+
+ Aggregation
+
+Sometimes users only care about aggregated results. The `aggregation` merge 
engine aggregates each value field with the latest data one by one under the 
same primary key according to the aggregate function.
+
+Each field not part of the primary keys must be given an aggregate function, 
specified by the `fields..aggregate-function` table property. For 
example, consider the following table definition.
+
+{{< tabs "aggregation-merge-engine-example" >}}
+
+{{< tab "Flink" >}}
+
+```sql
+CREATE TABLE MyTable (
+product_id BIGINT,
+price DOUBLE,
+sales BIGINT,
+PRIMARY KEY (product_id) NOT ENFORCED
+) WITH (
+'merge-engine' = 'aggregation',
+'fields.price.aggregate-function' = 'max',
+'fields.sales.aggregate-function' = 'sum'
+);
+```
+
+{{< /tab >}}
+
+{{< /tabs >}}
+
+Field `price` will be aggregated by the `max` function, and field `sales` will 
be aggregated by the `sum` function. Given two input records `<1, 23.0, 15>` 
and `<1, 30.2, 20>`, the final result will be `<1, 30.2, 35>`.
+
+Current supported aggregate functions are data types are:
+
+* `sum`: supports DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT and 
DOUBLE.
+* `min`/`max`: support DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, 
DOUBLE, DATE, TIME, TIMESTAMP and TIMESTAMP_LTZ.
+* `last_value` / `last_non_null_value`: support all data types.
+* `listagg`: supports STRING data type.
+* `bool_and` / `bool_or`: support BOOLEAN data type.
+
+### Changelog Producers
+
+Streaming queries will continuously produce latest changes. These changes can 
come from the underlying table files or from an [external log system]({{< ref 
"docs/features/external-log-systems" >}}) like Kafka. Compared to the external 
log system, changes from table files have lower cost but higher latency 
(depending on how often snapshots are created).
+
+By specifying the `changelog-producer` table property when creating the table, 
users can choose the pattern of changes produced from files.
+
+ None
+
+By default, no extra changelog producer will be applied to the writer of 
table. Table Store source can only see the merged changes across snapshots, 
like what keys are removed and what are the new values of some keys.
+
+However, these merged changes cannot form a complete changelog, because we 
can't read the old values of the keys directly from them. Merged changes 
require the consumers to "remember" the values of each key and to rewrite the 
values without seeing the old ones.

Review Comment:
   Why do 

[GitHub] [flink-kubernetes-operator] jbusche commented on pull request #491: [FLINK-30456] Fixing Version and provider in OLM Description

2022-12-20 Thread GitBox


jbusche commented on PR #491:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/491#issuecomment-1360368627

   OK, let me test this one on my OpenShift cluster, will post results here
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] jbusche commented on a diff in pull request #491: [FLINK-30456] Fixing Version and provider in OLM Description

2022-12-20 Thread GitBox


jbusche commented on code in PR #491:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/491#discussion_r1053777424


##
tools/olm/docker-entry.sh:
##
@@ -102,6 +96,10 @@ generate_olm_bundle() {
   yq ea -i ".spec.replaces = \"${PACKAGE_NAME}.v${PREVIOUS_BUNDLE_VERSION}\" | 
.spec.replaces style=\"\"" "${CSV_FILE}"
 
   yq ea -i "del(.subjects[0].namespace)" "${ROLE_BINDING}"
+
+  # Needed to replace description with new bundle values
+  sed -i "s/release-1.1/release-${BUNDLE_VERSION}/" "${CSV_FILE}"
+  sed -i "s/version: 1.2.0/version: ${BUNDLE_VERSION}/" "${CSV_FILE}"

Review Comment:
   That's a great idea @gyfora, I've added a RELEASE_VERSION placeholder and 
also I've added a few items under links, specifically:
   - name: Website
 url: https://flink.apache.org/
   - name: Documentation
 url: 
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/
   - name: Mailing list
 url: https://lists.apache.org/list.html?d...@flink.apache.org
   - name: Slack
 url: 
https://apache-flink.slack.com/join/shared_invite/zt-1llkzbgyt-K2nNGGg88rfsDGLkT09Qzg#/shared-invite/email
   - name: GitHub
 url: https://github.com/apache/flink-kubernetes-operator



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] mbalassi merged pull request #21511: Delegation token framework refactoring

2022-12-20 Thread GitBox


mbalassi merged PR #21511:
URL: https://github.com/apache/flink/pull/21511


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] Gerrrr commented on a diff in pull request #443: [FLINK-30458] Refactor Table Store Documentation

2022-12-20 Thread GitBox


Ge commented on code in PR #443:
URL: https://github.com/apache/flink-table-store/pull/443#discussion_r1053671570


##
docs/content/docs/sql-api/writing-tables.md:
##
@@ -0,0 +1,79 @@
+---
+title: "Writing Tables"
+weight: 4
+type: docs
+aliases:
+- /sql-api/writing-tables.html
+---
+
+
+# Writing Tables
+
+## Applying Records/Changes to Tables
+
+{{< tabs "insert-into-example" >}}
+
+{{< tab "Flink" >}}
+
+Use `INSERT INTO` to apply records and changes to tables.

Review Comment:
   Under `changelog-mode: all`, how does FTS keep `before` values for the 
insertions into the log store?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] Gerrrr commented on a diff in pull request #443: [FLINK-30458] Refactor Table Store Documentation

2022-12-20 Thread GitBox


Ge commented on code in PR #443:
URL: https://github.com/apache/flink-table-store/pull/443#discussion_r1053657989


##
docs/content/docs/features/table-types.md:
##
@@ -0,0 +1,142 @@
+---
+title: "Table Types"
+weight: 1
+type: docs
+aliases:
+- /features/table-types.html
+---
+
+
+# Table Types
+
+Table Store supports various types of tables. Users can specify `write-mode` 
table property to specify table types when creating tables.
+
+## Changelog Tables with Primary Keys
+
+Changelog table is the default table type when creating a table. Users can 
also specify `'write-mode' = 'change-log'` explicitly in table properties when 
creating the table.
+
+Primary keys are a set of columns that are unique for each record. Table Store 
imposes an ordering of data, which means the system will sort the primary key 
within each bucket. Using this feature, users can achieve high performance by 
adding filter conditions on the primary key.
+
+By [defining primary keys]({{< ref 
"docs/sql-api/creating-tables#tables-with-primary-keys" >}}) on a changelog 
table, users can access the following features.
+
+### Merge Engines
+
+When Table Store sink receives two or more records with the same primary keys, 
it will merge them into one record to keep primary keys unique. By specifying 
the `merge-engine` table property, users can choose how records are merged 
together.
+
+ Deduplicate
+
+`deduplicate` merge engine is the default merge engine. Table Store will only 
keep the latest record and throw away other records with the same primary keys.
+
+Specifically, if the latest record is a `DELETE` record, all records with the 
same primary keys will be deleted.
+
+ Partial Update
+
+By specifying `'merge-engine' = 'partial-update'`, users can set columns of a 
record across multiple updates and finally get a complete record. Specifically, 
value fields are updated to the latest data one by one under the same primary 
key, but null values are not overwritten.
+
+For example, let's say Table Store receives three records `<1, 23.0, 10, 
NULL>`, `<1, NULL, NULL, 'This is a book'>` and `<1, 25.2, NULL, NULL>`, where 
the first column is the primary key. The final result will be `<1, 25.2, 10, 
'This is a book'>`.
+
+NOTE: For streaming queries, `partial-update` merge engine must be used 
together with `full-compaction` [changelog producer]({{< ref 
"docs/features/table-types#changelog-producers" >}}).
+
+ Aggregation
+
+Sometimes users only care about aggregated results. The `aggregation` merge 
engine aggregates each value field with the latest data one by one under the 
same primary key according to the aggregate function.
+
+Each field not part of the primary keys must be given an aggregate function, 
specified by the `fields..aggregate-function` table property. For 
example, consider the following table definition.
+
+{{< tabs "aggregation-merge-engine-example" >}}
+
+{{< tab "Flink" >}}
+
+```sql
+CREATE TABLE MyTable (
+product_id BIGINT,
+price DOUBLE,
+sales BIGINT,
+PRIMARY KEY (product_id) NOT ENFORCED
+) WITH (
+'merge-engine' = 'aggregation',
+'fields.price.aggregate-function' = 'max',
+'fields.sales.aggregate-function' = 'sum'
+);
+```
+
+{{< /tab >}}
+
+{{< /tabs >}}
+
+Field `price` will be aggregated by the `max` function, and field `sales` will 
be aggregated by the `sum` function. Given two input records `<1, 23.0, 15>` 
and `<1, 30.2, 20>`, the final result will be `<1, 30.2, 35>`.
+
+Current supported aggregate functions are data types are:
+
+* `sum`: supports DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT and 
DOUBLE.
+* `min`/`max`: support DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, 
DOUBLE, DATE, TIME, TIMESTAMP and TIMESTAMP_LTZ.
+* `last_value` / `last_non_null_value`: support all data types.
+* `listagg`: supports STRING data type.
+* `bool_and` / `bool_or`: support BOOLEAN data type.
+
+### Changelog Producers
+
+Streaming queries will continuously produce latest changes. These changes can 
come from the underlying table files or from an [external log system]({{< ref 
"docs/features/external-log-systems" >}}) like Kafka. Compared to the external 
log system, changes from table files have lower cost but higher latency 
(depending on how often snapshots are created).
+
+By specifying the `changelog-producer` table property when creating the table, 
users can choose the pattern of changes produced from files.

Review Comment:
   Does it mean that the `changelog-producer` option is only relevant to the 
changelog from files? Does it do anything when Kafka is used?



##
docs/content/docs/sql-api/writing-tables.md:
##
@@ -0,0 +1,79 @@
+---
+title: "Writing Tables"
+weight: 4
+type: docs
+aliases:
+- /sql-api/writing-tables.html
+---
+
+
+# Writing Tables
+
+## Applying Records/Changes to Tables
+
+{{< tabs "insert-into-example" >}}
+
+{{< tab "Flink" >}}
+
+Use `INSERT INTO` to apply records and changes to tables.
+

[GitHub] [flink] flinkbot commented on pull request #21539: [FLINK-30409][Runtime / Metrics] Return a new metric group when creating closed metric g…

2022-12-20 Thread GitBox


flinkbot commented on PR #21539:
URL: https://github.com/apache/flink/pull/21539#issuecomment-1359996492

   
   ## CI report:
   
   * c8b29b3dac1540467927bb6ff50e43e50ec44df1 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-30409) Support reopening closed metric groups

2022-12-20 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30409?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-30409:
---
Labels: pull-request-available  (was: )

> Support reopening closed metric groups
> --
>
> Key: FLINK-30409
> URL: https://issues.apache.org/jira/browse/FLINK-30409
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics
>Affects Versions: 1.17.0
>Reporter: Mason Chen
>Assignee: Mason Chen
>Priority: Major
>  Labels: pull-request-available
>
> Currently, metricGroup.close() will unregister metrics and the underlying 
> metric groups. If the metricGroup is created again via addGroup(), it will 
> silently fail to create metrics since the metric group is in a closed state.
> We need to close metric groups and reopen them because some of the metrics 
> may reference old objects that are no longer relevant/stale and we need to 
> re-create the metric/metric group to point to the new references. For 
> example, we may close `KafkaSourceReader` to remove a topic partition from 
> assignment and then recreate `KafkaSourceReader` with a different set of 
> topic partitions. The metrics should also reflect that.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] mas-chen opened a new pull request, #21539: [FLINK-30409][Runtime / Metrics] Return a new metric group when creating closed metric g…

2022-12-20 Thread GitBox


mas-chen opened a new pull request, #21539:
URL: https://github.com/apache/flink/pull/21539

   …roup
   
   
   
   ## What is the purpose of the change
   
   The addGroup returns new a metric group if the metric group is already 
closed, so that implementations can recreate metrics.
   
   ## Brief change log
   
   - addGroup returns a new metric group if it is already closed.
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   - Added unit tests and ensured existing unit tests succeed to ensure 
behavior is backward compatible.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] mbalassi commented on pull request #21511: Delegation token framework refactoring

2022-12-20 Thread GitBox


mbalassi commented on PR #21511:
URL: https://github.com/apache/flink/pull/21511#issuecomment-1359896458

   Will merge this later today.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-30456) OLM Bundle Description Version Problems

2022-12-20 Thread Ted Chang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17649882#comment-17649882
 ] 

Ted Chang edited comment on FLINK-30456 at 12/20/22 5:36 PM:
-

Hi Jim,  could you also fix the LINKS showing n/a.
[https://operatorhub.io/operator/flink-kubernetes-operator]
!image-2022-12-20-08-21-02-597.png!

https://sdk.operatorframework.io/docs/olm-integration/generation/#csv-fields


was (Author: JIRAUSER287036):
Hi Jim,  could you also fix the LINKS showing n/a.
https://operatorhub.io/operator/flink-kubernetes-operator
!image-2022-12-20-08-21-02-597.png!

> OLM Bundle Description Version Problems
> ---
>
> Key: FLINK-30456
> URL: https://issues.apache.org/jira/browse/FLINK-30456
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.3.0
>Reporter: James Busche
>Assignee: James Busche
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2022-12-20-08-21-02-597.png
>
>
> OLM is working great with OperatorHub, but noticed a few items that need 
> fixing.
> 1.  The basic.yaml example version is release-1.1 instead of the latest 
> release (release-1.3).  This needs fixing in two places:
> tools/olm/generate-olm-bundle.sh
> tools/olm/docker-entry.sh
> 2.  The label versions in the description are hardcoded to 1.2.0 instead of 
> the latest release (1.3.0)
> 3. The Provider is blank space " " but soon needs to have some text in there 
> to avoid CI errors with the latest operator-sdk versions.  The person who 
> noticed it recommended "Community" for now, but maybe we can being making it 
> "The Apache Software Foundation" now?  Not sure if we're ready for that yet 
> or not...
>  
> I'm working on a PR to address these items.  Can you assign the issue to me?  
> Thanks!
> fyi [~tedhtchang] [~gyfora] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] flinkbot commented on pull request #21538: [FLINK-24386] Guard JobMaster against exceptions from starting the Op…

2022-12-20 Thread GitBox


flinkbot commented on PR #21538:
URL: https://github.com/apache/flink/pull/21538#issuecomment-1359782359

   
   ## CI report:
   
   * 0f68db2a53d68bb850f6cd0ac285075e144f38ab UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-24386) JobMaster should guard against exceptions from OperatorCoordinator

2022-12-20 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24386?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-24386:
---
Labels: pull-request-available  (was: )

> JobMaster should guard against exceptions from OperatorCoordinator
> --
>
> Key: FLINK-24386
> URL: https://issues.apache.org/jira/browse/FLINK-24386
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.14.0, 1.13.2
>Reporter: David Morávek
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> Original report from [~sewen]:
> When the scheduler processes the call to trigger a _globalFailover_
>  and something goes wrong in there, the _JobManager_ gets stuck. Concretely, 
> I have an _OperatorCoordinator_ that throws an exception in 
> _subtaskFailed()_, which gets called as part of processing the failover.
> While this is a bug in that coordinator, the whole thing seems a bit 
> dangerous to me. If there is some bug in any part of the failover logic, we 
> have no safety net. No "hard crash" and let the process be restarted. We only 
> see a log line (below) and everything goes unresponsive.
> {code:java}
> ERROR org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor [] - Caught 
> exception while executing runnable in main thread.
> {code}
> Shouldn't we have some safety nets in place here?
>  * I am wondering if the place where that line is logged should actually 
> invoke the fatal error handler. If an exception propagates out of a main 
> thread action, we need to call off all bets and assume things have gotten 
> inconsistent.
>  * At the very least, the failover procedure itself should be guarded. If an 
> error happens while processing the global failover, then we need to treat 
> this as beyond redemption and declare a fatal error.
> The fatal error would give us a log line and the user a container restart, 
> hopefully fixing things (unless it was a deterministic error).
> [~dmvk] notes:
>  * OperatorCoordinator is part of a public API interface (part of JobGraph).
>  ** Can be provided by implementing CoordinatedOperatorFactory
>  ** This actually gives the issue higher priority than I initially thought.
>  * We should guard against flaws in user code:
>  ** There are two types of interfaces
>  *** (CRITICAL) Public API for JobGraph construction / submission
>  *** Semi-public interfaces such as custom HA Services, this is for power 
> users, so I wouldn't be as concerned there.
>  ** We already do good job guarding against failure on TM side
>  ** Considering the critical parts on JM side, there two places where user 
> can "hook"
>  *** OperatorCoordinator
>  *** InitializeOnMaster, FinalizeOnMaster (batch sinks only, legacy from the 
> Hadoop world)
> --- 
> We should audit all the calls to OperatorCoordinator and handle failures 
> accordingly. We want to avoid unnecessary JVM terminations as much as 
> possible (sometimes it's the only option though).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] hlteoh37 opened a new pull request, #21538: [FLINK-24386] Guard JobMaster against exceptions from starting the Op…

2022-12-20 Thread GitBox


hlteoh37 opened a new pull request, #21538:
URL: https://github.com/apache/flink/pull/21538

   …eratorCoordinator
   
   ## What is the purpose of the change
   * Improve the stability of the JobMaster when OperatorCoordinator throws an 
Exception upon start(). Instead of restarting the entire JobMaster, we will 
directly restart the job instead.
   
   ## Brief change log
   * Catch non-fatal Throwables when starting OperatorCoordinator, and fail the 
job instead of rethrowing the JobMaster main thread.
   
   ## Verifying this change
   This change added tests and can be verified as follows:
   - Added unit test to verify that an OperatorCoordinator exception triggers a 
job failure without terminating the running thread.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-jdbc] eskabetxe commented on a diff in pull request #8: [FLINK-14102] Introduce DB2Dialect.

2022-12-20 Thread GitBox


eskabetxe commented on code in PR #8:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/8#discussion_r1053513698


##
flink-connector-jdbc/pom.xml:
##
@@ -195,6 +203,14 @@ under the License.
test

 
+   
+   
+   org.testcontainers
+   db2
+   1.17.3

Review Comment:
   You should use the defined version of org.testcontainers
   
   ${testcontainers.version}



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-30466) Detect ineffective scaling operations

2022-12-20 Thread Gyula Fora (Jira)
Gyula Fora created FLINK-30466:
--

 Summary: Detect ineffective scaling operations
 Key: FLINK-30466
 URL: https://issues.apache.org/jira/browse/FLINK-30466
 Project: Flink
  Issue Type: New Feature
  Components: Kubernetes Operator
Reporter: Gyula Fora
Assignee: Matyas Orhidi
 Fix For: kubernetes-operator-1.4.0


The autoscaler logic currently does not consider whether a given proposed 
parallelism change will achieve the desired processing rate based on historic 
metrics.

It might be possible that we are for example trying to scale down to a 
parallelism that we have also done in the past which resulted in an immediate 
scale up.

This could be avoided by comparing the expected processing rate at a certain 
parallelism with past metrics from that parallelism setting. There should be 
some flexibility and a limited "memory" but some simple checks could reduce 
instable scaling caused by non-linear performance changes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-30244) When task using udf/udtf with jni, on k8s session the old TM will shut down and create new TM or the task will fail

2022-12-20 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30244?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora closed FLINK-30244.
--
Resolution: Abandoned

> When task using udf/udtf with jni, on k8s session the old TM will shut down 
> and create new TM or the task will fail
> ---
>
> Key: FLINK-30244
> URL: https://issues.apache.org/jira/browse/FLINK-30244
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / Kubernetes, Kubernetes Operator, Runtime / 
> Task
>Affects Versions: 1.15.3
>Reporter: AlexHu
>Priority: Major
> Attachments: image-2022-11-30-14-47-50-923.png, 
> image-2022-11-30-15-00-06-710.png, image-2022-11-30-15-04-45-696.png, 
> image-2022-11-30-15-05-29-120.png
>
>
> We face a problem when we try to use flink on k8s to execute task with 
> udf/udtf. When we finished or canceled a job and submit a same job, the old 
> TM will be not reachable and restart. Why the TM have to be restart? In 
> session mode, the TM should be reused by JM. Moreover, if we off restart 
> strategy, this task will fail.
> !image-2022-11-30-14-47-50-923.png!
>  
> First submit, the job will running:
> !image-2022-11-30-15-00-06-710.png!
>  
> But, cancel it and submit the same:
> !image-2022-11-30-15-04-45-696.png!
> Internal server error, but in k8s the pod is running.
> !image-2022-11-30-15-05-29-120.png!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30408) Add unit test for HA metadata check logic

2022-12-20 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30408?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17649891#comment-17649891
 ] 

Gyula Fora commented on FLINK-30408:


merged to release-1.3 4e583a0faa0991c61b37ecd9b937fa3e11c6493a

> Add unit test for HA metadata check logic
> -
>
> Key: FLINK-30408
> URL: https://issues.apache.org/jira/browse/FLINK-30408
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Gyula Fora
>Priority: Critical
> Fix For: kubernetes-operator-1.4.0, kubernetes-operator-1.3.1
>
>
> The current mechanism to check for the existence of HA metadata in the 
> operator is not guarded by any unit tests which makes in more susceptible to 
> accidental regressions.
> We should add at least a few simple test cases to cover the expected behaviour



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30408) Add unit test for HA metadata check logic

2022-12-20 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30408?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora updated FLINK-30408:
---
Fix Version/s: kubernetes-operator-1.3.1

> Add unit test for HA metadata check logic
> -
>
> Key: FLINK-30408
> URL: https://issues.apache.org/jira/browse/FLINK-30408
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Gyula Fora
>Priority: Critical
> Fix For: kubernetes-operator-1.4.0, kubernetes-operator-1.3.1
>
>
> The current mechanism to check for the existence of HA metadata in the 
> operator is not guarded by any unit tests which makes in more susceptible to 
> accidental regressions.
> We should add at least a few simple test cases to cover the expected behaviour



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30437) State incompatibility issue might cause state loss

2022-12-20 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17649890#comment-17649890
 ] 

Gyula Fora commented on FLINK-30437:


merged to release-1.3 4e583a0faa0991c61b37ecd9b937fa3e11c6493a

> State incompatibility issue might cause state loss
> --
>
> Key: FLINK-30437
> URL: https://issues.apache.org/jira/browse/FLINK-30437
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.2.0, kubernetes-operator-1.3.0
>Reporter: Gyula Fora
>Assignee: Gyula Fora
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.4.0, kubernetes-operator-1.3.1
>
>
> Even though we set:
> execution.shutdown-on-application-finish: false
> execution.submit-failed-job-on-application-error: true
> If there is a state incompatibility the jobmanager marks the Job failed, 
> cleans up HA metada and restarts itself. This is a very concerning behaviour, 
> but we have to fix this on the operator side to at least guarantee no state 
> loss.
> The solution is to harden the HA metadata check properly 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30437) State incompatibility issue might cause state loss

2022-12-20 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora updated FLINK-30437:
---
Fix Version/s: kubernetes-operator-1.3.1

> State incompatibility issue might cause state loss
> --
>
> Key: FLINK-30437
> URL: https://issues.apache.org/jira/browse/FLINK-30437
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.2.0, kubernetes-operator-1.3.0
>Reporter: Gyula Fora
>Assignee: Gyula Fora
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.4.0, kubernetes-operator-1.3.1
>
>
> Even though we set:
> execution.shutdown-on-application-finish: false
> execution.submit-failed-job-on-application-error: true
> If there is a state incompatibility the jobmanager marks the Job failed, 
> cleans up HA metada and restarts itself. This is a very concerning behaviour, 
> but we have to fix this on the operator side to at least guarantee no state 
> loss.
> The solution is to harden the HA metadata check properly 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-kubernetes-operator] gyfora merged pull request #490: [backport][FLINK-30437][FLINK-30408] Harden HA meta check to avoid state loss

2022-12-20 Thread GitBox


gyfora merged PR #490:
URL: https://github.com/apache/flink-kubernetes-operator/pull/490


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #491: [FLINK-30456] Fixing Version and provider in OLM Description

2022-12-20 Thread GitBox


gyfora commented on code in PR #491:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/491#discussion_r1053512772


##
tools/olm/docker-entry.sh:
##
@@ -102,6 +96,10 @@ generate_olm_bundle() {
   yq ea -i ".spec.replaces = \"${PACKAGE_NAME}.v${PREVIOUS_BUNDLE_VERSION}\" | 
.spec.replaces style=\"\"" "${CSV_FILE}"
 
   yq ea -i "del(.subjects[0].namespace)" "${ROLE_BINDING}"
+
+  # Needed to replace description with new bundle values
+  sed -i "s/release-1.1/release-${BUNDLE_VERSION}/" "${CSV_FILE}"
+  sed -i "s/version: 1.2.0/version: ${BUNDLE_VERSION}/" "${CSV_FILE}"

Review Comment:
   would it make sense to add some nicer placeholder strings in the CSV_FILE ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-30456) OLM Bundle Description Version Problems

2022-12-20 Thread Ted Chang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17649882#comment-17649882
 ] 

Ted Chang commented on FLINK-30456:
---

Hi Jim,  could you also fix the LINKS showing n/a.
https://operatorhub.io/operator/flink-kubernetes-operator
!image-2022-12-20-08-21-02-597.png!

> OLM Bundle Description Version Problems
> ---
>
> Key: FLINK-30456
> URL: https://issues.apache.org/jira/browse/FLINK-30456
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.3.0
>Reporter: James Busche
>Assignee: James Busche
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2022-12-20-08-21-02-597.png
>
>
> OLM is working great with OperatorHub, but noticed a few items that need 
> fixing.
> 1.  The basic.yaml example version is release-1.1 instead of the latest 
> release (release-1.3).  This needs fixing in two places:
> tools/olm/generate-olm-bundle.sh
> tools/olm/docker-entry.sh
> 2.  The label versions in the description are hardcoded to 1.2.0 instead of 
> the latest release (1.3.0)
> 3. The Provider is blank space " " but soon needs to have some text in there 
> to avoid CI errors with the latest operator-sdk versions.  The person who 
> noticed it recommended "Community" for now, but maybe we can being making it 
> "The Apache Software Foundation" now?  Not sure if we're ready for that yet 
> or not...
>  
> I'm working on a PR to address these items.  Can you assign the issue to me?  
> Thanks!
> fyi [~tedhtchang] [~gyfora] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30456) OLM Bundle Description Version Problems

2022-12-20 Thread Ted Chang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Chang updated FLINK-30456:
--
Attachment: image-2022-12-20-08-21-02-597.png

> OLM Bundle Description Version Problems
> ---
>
> Key: FLINK-30456
> URL: https://issues.apache.org/jira/browse/FLINK-30456
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.3.0
>Reporter: James Busche
>Assignee: James Busche
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2022-12-20-08-21-02-597.png
>
>
> OLM is working great with OperatorHub, but noticed a few items that need 
> fixing.
> 1.  The basic.yaml example version is release-1.1 instead of the latest 
> release (release-1.3).  This needs fixing in two places:
> tools/olm/generate-olm-bundle.sh
> tools/olm/docker-entry.sh
> 2.  The label versions in the description are hardcoded to 1.2.0 instead of 
> the latest release (1.3.0)
> 3. The Provider is blank space " " but soon needs to have some text in there 
> to avoid CI errors with the latest operator-sdk versions.  The person who 
> noticed it recommended "Community" for now, but maybe we can being making it 
> "The Apache Software Foundation" now?  Not sure if we're ready for that yet 
> or not...
>  
> I'm working on a PR to address these items.  Can you assign the issue to me?  
> Thanks!
> fyi [~tedhtchang] [~gyfora] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #492: [FLINK-30463] Start stabilization period only after job goes into RUNNING

2022-12-20 Thread GitBox


gyfora commented on code in PR #492:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/492#discussion_r1053501498


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java:
##
@@ -121,13 +122,22 @@ public boolean scaleResource(
 
 private boolean stabilizationPeriodPassed(
 AbstractFlinkResource resource, Configuration conf) {
-var now = clock.instant();
+var jobStatus = resource.getStatus().getJobStatus();
+
+if (!JobStatus.RUNNING.name().equals(jobStatus.getState())) {

Review Comment:
   This might be a redundant check because the `JobAutoscaler` class already 
runs this check before calling any collection, evaluation etc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   3   >