[GitHub] [flink] zhuzhurk commented on a diff in pull request #19747: [FLINK-17295][runtime] Refactor the ExecutionAttemptID to consist of ExecutionGraphID, ExecutionVertexID and attemptNumber

2022-05-17 Thread GitBox


zhuzhurk commented on code in PR #19747:
URL: https://github.com/apache/flink/pull/19747#discussion_r875493129


##
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAttemptID.java:
##
@@ -68,19 +117,32 @@ public boolean equals(Object obj) {
 return true;
 } else if (obj != null && obj.getClass() == getClass()) {
 ExecutionAttemptID that = (ExecutionAttemptID) obj;
-return that.executionAttemptId.equals(this.executionAttemptId);
+return that.executionGraphId.equals(this.executionGraphId)
+&& that.executionVertexId.equals(this.executionVertexId)
+&& that.attemptNumber == this.attemptNumber;
 } else {
 return false;
 }
 }
 
 @Override
 public int hashCode() {
-return executionAttemptId.hashCode();
+return Objects.hash(executionGraphId, executionVertexId, 
attemptNumber);
 }
 
 @Override
 public String toString() {
-return executionAttemptId.toString();
+return String.format(
+"%s_%s_%d", executionGraphId.toString(), executionVertexId, 
attemptNumber);
+}
+
+public String getLogString() {
+if (DefaultExecutionGraph.LOG.isDebugEnabled()) {
+return toString();
+} else {
+return String.format(
+"%s_%s_%d",
+executionGraphId.toString().substring(0, 4), 
executionVertexId, attemptNumber);

Review Comment:
   > logging the EG ID is fine, but this can't be done in isolation. There must 
be a way to correlate this id with a specific job submission; for example we 
could log the EG ID when the EG for a given job was created.
   
   Totally agree. Will add a log when creating the EG.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-17295) Refactor the ExecutionAttemptID to consist of ExecutionVertexID and attemptNumber

2022-05-17 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538581#comment-17538581
 ] 

Zhu Zhu commented on FLINK-17295:
-

The benchmark value is the size of a serialized TDD.

The difference between the 2 proposal is, each execution of #1 has a unique 
AbstractID, while all executions of #2 share one same ExecutionGraphID. 
Therefore, given *P* is the parallelism, a TDD of #1 contains *P* unique 
{*}AbstractID{*}s(each 16 bytes), while a TDD of #2 contains *P* 
{*}references{*}(each 4 bytes) to the ExecutionGraphID. 

> Refactor the ExecutionAttemptID to consist of ExecutionVertexID and 
> attemptNumber
> -
>
> Key: FLINK-17295
> URL: https://issues.apache.org/jira/browse/FLINK-17295
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Yangze Guo
>Assignee: Zhu Zhu
>Priority: Major
>  Labels: pull-request-available
>
> Make the ExecutionAttemptID being composed of (ExecutionVertexID, 
> attemptNumber).



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Assigned] (FLINK-27675) Improve manual savepoint tracking

2022-05-17 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora reassigned FLINK-27675:
--

Assignee: Matyas Orhidi

> Improve manual savepoint tracking
> -
>
> Key: FLINK-27675
> URL: https://issues.apache.org/jira/browse/FLINK-27675
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Gyula Fora
>Assignee: Matyas Orhidi
>Priority: Blocker
> Fix For: kubernetes-operator-1.0.0
>
>
> There are 2 problems with the manual savpeoint result observing logic that 
> can cause the reconciler to not make progress with the deployment 
> (recoveries, upgrades etc).
>  # Whenever the jobmanager deployment is not in READY state or the job itself 
> is not RUNNING, the trigger info must be reset and we should not try to query 
> it anymore. Flink will not retry the savepoint if the job fails, restarted 
> anyways.
>  # If there is a sensible error when fetching the savepoint status (such as: 
> There is no savepoint operation with triggerId=xxx for job ) we should simply 
> reset the trigger. These errors will never go away on their own and will 
> simply cause the deployment to get stuck in observing/waiting for a savepoint 
> to complete



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Comment Edited] (FLINK-27257) Flink kubernetes operator triggers savepoint failed because of not all tasks running

2022-05-17 Thread Nicholas Jiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538539#comment-17538539
 ] 

Nicholas Jiang edited comment on FLINK-27257 at 5/18/22 5:21 AM:
-

I have offline discussed with [~wangyang0918]. It's hard to improve the 
isJobRunning implementation because the returned JobStatusMessages of 
ClusterClient#listJobs() don't contain the tasksPerState of the JobDetail, 
which cause that there is no way to judge whether all the ExecutionState of 
tasks are running in the job and for batch tasks how to judge whether the job 
is really running.

The current idea is that if the error is found to be not all required tasks are 
currently running, then continue to trigger savepoint in the next 
reconciliation until it is successfully triggered.

[~gyfora], [~wangyang0918] WDYT?


was (Author: nicholasjiang):
I have offline discussed with [~wangyang0918]. It's hard to improve the 
isJobRunning implementation because the returned JobStatusMessages of 
ClusterClient#listJobs() don't contain the tasksPerState of the JobDetail, 
which cause that there is no way to judge whether all the ExecutionState of 
tasks are running in the job and for batch tasks how to judge whether the job 
is really running.

The current idea is that if the error is found to be not all required tasks are 
currently running, then continue to trigger savepoint in the next 
reconciliation until it is successfully triggered.

[~gyfora][~wangyang0918] WDYT?

> Flink kubernetes operator triggers savepoint failed because of not all tasks 
> running
> 
>
> Key: FLINK-27257
> URL: https://issues.apache.org/jira/browse/FLINK-27257
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Reporter: Yang Wang
>Assignee: Nicholas Jiang
>Priority: Major
> Fix For: kubernetes-operator-1.1.0
>
>
> {code:java}
> 2022-04-15 02:38:56,551 o.a.f.k.o.s.FlinkService       [INFO 
> ][default/flink-example-statemachine] Fetching savepoint result with 
> triggerId: 182d7f176496856d7b33fe2f3767da18
> 2022-04-15 02:38:56,690 o.a.f.k.o.s.FlinkService       
> [ERROR][default/flink-example-statemachine] Savepoint error
> org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint 
> triggering task Source: Custom Source (1/2) of job 
>  is not being executed at the moment. 
> Aborting checkpoint. Failure reason: Not all required tasks are currently 
> running.
>     at 
> org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.checkTasksStarted(DefaultCheckpointPlanCalculator.java:143)
>     at 
> org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.lambda$calculateCheckpointPlan$1(DefaultCheckpointPlanCalculator.java:105)
>     at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
>     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:455)
>     at 
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
>     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:455)
>     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213)
>     at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
>     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
>     at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>     at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>     at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>     at akka.actor.Actor.aroundReceive(Actor.scala:537)
>     at akka.actor.Actor.aroundReceive$(Actor.scala:535)
>     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
>     at akka.actor.ActorCell.invoke(ActorCell.scala:548)
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
>     at akka.dispatch.Mailbox.run(Mailbox.scala:231)
>     at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
>     at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
>     at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
>     at 

[jira] [Created] (FLINK-27675) Improve manual savepoint tracking

2022-05-17 Thread Gyula Fora (Jira)
Gyula Fora created FLINK-27675:
--

 Summary: Improve manual savepoint tracking
 Key: FLINK-27675
 URL: https://issues.apache.org/jira/browse/FLINK-27675
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Reporter: Gyula Fora
 Fix For: kubernetes-operator-1.0.0


There are 2 problems with the manual savpeoint result observing logic that can 
cause the reconciler to not make progress with the deployment (recoveries, 
upgrades etc).
 # Whenever the jobmanager deployment is not in READY state or the job itself 
is not RUNNING, the trigger info must be reset and we should not try to query 
it anymore. Flink will not retry the savepoint if the job fails, restarted 
anyways.
 # If there is a sensible error when fetching the savepoint status (such as: 
There is no savepoint operation with triggerId=xxx for job ) we should simply 
reset the trigger. These errors will never go away on their own and will simply 
cause the deployment to get stuck in observing/waiting for a savepoint to 
complete



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink] cun8cun8 closed pull request #19740: [FLINK-26480]Support window_all in Python DataStream API

2022-05-17 Thread GitBox


cun8cun8 closed pull request #19740: [FLINK-26480]Support window_all in Python 
DataStream API
URL: https://github.com/apache/flink/pull/19740


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] cun8cun8 closed pull request #19755: [FLINK-26480]Support window_all in Python DataStream API

2022-05-17 Thread GitBox


cun8cun8 closed pull request #19755: [FLINK-26480]Support window_all in Python 
DataStream API
URL: https://github.com/apache/flink/pull/19755


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] openinx commented on a diff in pull request #117: [FLINK-27543] Hide column statistics collector inside the file format writer.

2022-05-17 Thread GitBox


openinx commented on code in PR #117:
URL: https://github.com/apache/flink-table-store/pull/117#discussion_r875460374


##
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileWriter.java:
##
@@ -237,7 +210,14 @@ public RollingKvWriter(Supplier 
writerFactory, long targetFileSize
 private Supplier createWriterFactory(int level) {
 return () -> {
 try {
-return new KvFileWriter(new KvBulkWriterFactory(), 
pathFactory.newPath(), level);
+FileWriter.Factory factory =

Review Comment:
   I decided to revert this change ( I mean putting the factory initialization 
into the constructor) , because it break the unit test ( I think the cause is 
the reuse of `KeyValueSerializer`).
   
   After applying this changes : 
https://github.com/apache/flink-table-store/pull/117/commits/0dd583045be08cc19990c4c98370bab1aad3ddde,
   the [unit test 
](https://github.com/apache/flink-table-store/pull/117#issuecomment-1129526584) 
 works fine. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-27666) Cover manual savepoint triggering in E2E tests

2022-05-17 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27666?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora reassigned FLINK-27666:
--

Assignee: Aitozi

> Cover manual savepoint triggering in E2E tests
> --
>
> Key: FLINK-27666
> URL: https://issues.apache.org/jira/browse/FLINK-27666
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Gyula Fora
>Assignee: Aitozi
>Priority: Major
>
> We should extend our e2e tests so that they cover manual savepoint triggering 
> using the savepointTriggerNonce.
> We should verify that savepoint was triggered, recorded in the status and 
> trigger information cleared afterwards.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (FLINK-27643) Document new deployment lifecycle features for the operator

2022-05-17 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-27643:
---
Labels: pull-request-available  (was: )

> Document new deployment lifecycle features for the operator
> ---
>
> Key: FLINK-27643
> URL: https://issues.apache.org/jira/browse/FLINK-27643
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Gyula Fora
>Assignee: Gyula Fora
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.0.0
>
>
> We should document the changes and new features to the core lifecycle 
> management logic, including:
>  * JM Deployment Recovery
>  * Rollbacks
>  * Any changed upgrade behavior
>  * 
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #219: [FLINK-27643] Add new features to lifecycle management docs

2022-05-17 Thread GitBox


Aitozi commented on code in PR #219:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/219#discussion_r875455510


##
docs/content/docs/custom-resource/job-management.md:
##
@@ -65,33 +66,38 @@ kubectl delete flinkdeployment my-deployment
 ```
 
 {{< hint danger >}}
-Deleting a deployment will remove all checkpoint and status information. 
Future deployments will from an empty state unless manually overriden by the 
user.
+Deleting a deployment will remove all checkpoint and status information. 
Future deployments will from an empty state unless manually overridden by the 
user.
 {{< /hint >}}
 
 ## Stateful and stateless application upgrades
 
-When the spec changes for a FlinkDeployment the running Application or Session 
cluster must be upgraded.
+When the spec changes for FlinkDeployment and FlinkSessionJob resources, the 
running application must be upgraded.
 In order to do this the operator will stop the currently running job (unless 
already suspended) and redeploy it using the latest spec and state carried over 
from the previous run for stateful applications.
 
 Users have full control on how state should be managed when stopping and 
restoring stateful applications using the `upgradeMode` setting of the JobSpec.
 
-Supported values:`stateless`, `savepoint`, `last-state`
+Supported values: `stateless`, `savepoint`, `last-state`
 
 The `upgradeMode` setting controls both the stop and restore mechanisms as 
detailed in the following table:
 
-| | Stateless | Last State | Savepoint |
-|  | -- |  |  |
-| Config Requirement | None | Checkpointing & Kubernetes HA Enabled | 
Savepoint directory defined |
-| Job Status Requirement | None* | None* | Job Running |
-| Suspend Mechanism | Cancel / Delete | Delete Flink deployment (keep HA 
metadata) | Cancel with savepoint |
-| Restore Mechanism | Deploy from empty state | Recover last state using HA 
metadata | Restore From savepoint |
+|| Stateless   | Last State
 | Savepoint  |
+||-|||
+| Config Requirement | None| Checkpointing & 
Kubernetes HA Enabled  | Checkpoint/Savepoint directory defined |
+| Job Status Requirement | None| HA metadata available 
 | Job Running*   |
+| Suspend Mechanism  | Cancel / Delete | Delete Flink deployment 
(keep HA metadata) | Cancel with savepoint  |
+| Restore Mechanism  | Deploy from empty state | Recover last state using 
HA metadata   | Restore From savepoint |
+| Production Use | Not recommended | Recommended   
 | Recommended|
 
-*\*In general no update can be executed while a savepoint operation is in 
progress*
 
-The three different upgrade modes are intended to support different use-cases:
- - stateless: Stateless applications, prototyping
- - last-state: Suitable for most stateful production applications. Quick 
upgrades in any application state (even for failing jobs), does not require a 
healthy job. Requires Flink Kubernetes HA configuration (see example below).
- - savepoint: Suitable for forking, migrating applications. Requires a healthy 
running job as it requires a savepoint operation before shutdown.
+*\*When Kubernetes HA is enabled the `savepoint` upgrade mode may fall back to 
the `last-state` behaviour in cases where the job is in an unhealthy state.*
+
+The three  upgrade modes are intended to support different scenarios:

Review Comment:
   extra space



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-27666) Cover manual savepoint triggering in E2E tests

2022-05-17 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538567#comment-17538567
 ] 

Aitozi commented on FLINK-27666:


I could work on this.  please assign this to me, I will try to push a PR today

> Cover manual savepoint triggering in E2E tests
> --
>
> Key: FLINK-27666
> URL: https://issues.apache.org/jira/browse/FLINK-27666
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Gyula Fora
>Priority: Major
>
> We should extend our e2e tests so that they cover manual savepoint triggering 
> using the savepointTriggerNonce.
> We should verify that savepoint was triggered, recorded in the status and 
> trigger information cleared afterwards.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink] flinkbot commented on pull request #19755: [FLINK-26480]Support window_all in Python DataStream API

2022-05-17 Thread GitBox


flinkbot commented on PR #19755:
URL: https://github.com/apache/flink/pull/19755#issuecomment-1129540427

   
   ## CI report:
   
   * ec83162a1f16488d6060dd977f9b5168d0e8f16f UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] cun8cun8 opened a new pull request, #19755: [FLINK-26480]Support window_all in Python DataStream API

2022-05-17 Thread GitBox


cun8cun8 opened a new pull request, #19755:
URL: https://github.com/apache/flink/pull/19755

   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the 
conventions defined in our code quality guide: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-27663) upsert-kafka can't process delete message from upsert-kafka sink

2022-05-17 Thread Zhiwen Sun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538560#comment-17538560
 ] 

Zhiwen Sun commented on FLINK-27663:


I have changed component to {{{}Connectors/Kafka{}}}.

Flink miss unit test on following method.
{code:java}
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema#deserialize(org.apache.kafka.clients.consumer.ConsumerRecord,
 org.apache.flink.util.Collector){code}

> upsert-kafka can't process delete message from upsert-kafka sink
> 
>
> Key: FLINK-27663
> URL: https://issues.apache.org/jira/browse/FLINK-27663
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.15.0, 1.13.6, 1.14.4
>Reporter: Zhiwen Sun
>Priority: Major
>
> upsert-kafka write DELETE data as Kafka messages with null values (indicate 
> tombstone for the key).
> But when use upsert-kafka as a source table to consumer kafka messages write 
> by upsert-kafka sink, DELETE messages will be ignored.
>  
> related sql :
>  
>  
> {code:java}
> create table order_system_log(
>   id bigint,
>   PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
>  'connector' = 'upsert-kafka',
>  'topic' = 'test_use',
>  'properties.bootstrap.servers' = 'your broker',
>  'properties.group.id' = 'your group id',
>  'value.json.fail-on-missing-field' = 'false',
>  'value.json.ignore-parse-errors' = 'true',
>  'key.json.fail-on-missing-field' = 'false',
>  'key.json.ignore-parse-errors' = 'true',
>  'key.format' = 'json',
>  'value.format' = 'json'
> );
> select
> *
> from
> order_system_log
> ;
> {code}
>  
>  
> The problem may be produced by DeserializationSchema#deserialize,
> this method does not collect data while subclass's deserialize return null.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Comment Edited] (FLINK-27663) upsert-kafka can't process delete message from upsert-kafka sink

2022-05-17 Thread Zhiwen Sun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538556#comment-17538556
 ] 

Zhiwen Sun edited comment on FLINK-27663 at 5/18/22 3:42 AM:
-

 

1. produce a DELETE message to kafka 
{code:java}
echo '{"id":1}#null' | kafka-console-producer --broker-list $broker --topic 
test_use --property "parse.key=true" --property "key.separator=#"{code}
 
2. run a flink sql job:
{code:java}
SET sql-client.execution.result-mode = tableau;
create table delete_test(
  id bigint,
  PRIMARY KEY (id) NOT ENFORCED
) WITH ( 'connector' = 'upsert-kafka',
 'topic' = 'test_use',
 'properties.bootstrap.servers' = 'your broker',
 'properties.group.id' = 'your group id',
 'value.json.fail-on-missing-field' = 'false',
 'value.json.ignore-parse-errors' = 'true',
 'key.json.fail-on-missing-field' = 'false',
 'key.json.ignore-parse-errors' = 'true',
 'key.format' = 'json',
 'value.format' = 'json');
select
*
from
delete_test
;
{code}

 

3.  expect : 
 
{code:java}
++--+
| op |               id |
++--+
| -D |                    1 | {code}
 
 
actual: no output
 


was (Author: pensz):
 

1. produce a DELETE message to kafka 
{code:java}
echo '{"id":1}#null' | kafka-console-producer --broker-list $broker --topic 
test_use --property "parse.key=true" --property "key.separator=#"{code}
 
2. run a flink sql job:
SET sql-client.execution.result-mode = tableau;

create table delete_test(
  id bigint,
  PRIMARY KEY (id) NOT ENFORCED
) WITH ( 'connector' = 'upsert-kafka',
 'topic' = 'test_use',
 'properties.bootstrap.servers' = 'your broker',
 'properties.group.id' = 'your group id',
 'value.json.fail-on-missing-field' = 'false',
 'value.json.ignore-parse-errors' = 'true',
 'key.json.fail-on-missing-field' = 'false',
 'key.json.ignore-parse-errors' = 'true',
 'key.format' = 'json',
 'value.format' = 'json');
select
*
from
delete_test
;
 

3.  expect : 
 
{code:java}
++--+
| op |               id |
++--+
| -D |                    1 | {code}
 
 
actual: no output
 

> upsert-kafka can't process delete message from upsert-kafka sink
> 
>
> Key: FLINK-27663
> URL: https://issues.apache.org/jira/browse/FLINK-27663
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.15.0, 1.13.6, 1.14.4
>Reporter: Zhiwen Sun
>Priority: Major
>
> upsert-kafka write DELETE data as Kafka messages with null values (indicate 
> tombstone for the key).
> But when use upsert-kafka as a source table to consumer kafka messages write 
> by upsert-kafka sink, DELETE messages will be ignored.
>  
> related sql :
>  
>  
> {code:java}
> create table order_system_log(
>   id bigint,
>   PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
>  'connector' = 'upsert-kafka',
>  'topic' = 'test_use',
>  'properties.bootstrap.servers' = 'your broker',
>  'properties.group.id' = 'your group id',
>  'value.json.fail-on-missing-field' = 'false',
>  'value.json.ignore-parse-errors' = 'true',
>  'key.json.fail-on-missing-field' = 'false',
>  'key.json.ignore-parse-errors' = 'true',
>  'key.format' = 'json',
>  'value.format' = 'json'
> );
> select
> *
> from
> order_system_log
> ;
> {code}
>  
>  
> The problem may be produced by DeserializationSchema#deserialize,
> this method does not collect data while subclass's deserialize return null.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-27663) upsert-kafka can't process delete message from upsert-kafka sink

2022-05-17 Thread Zhiwen Sun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538556#comment-17538556
 ] 

Zhiwen Sun commented on FLINK-27663:


 

1. produce a DELETE message to kafka 
{code:java}
echo '{"id":1}#null' | kafka-console-producer --broker-list $broker --topic 
test_use --property "parse.key=true" --property "key.separator=#"{code}
 
2. run a flink sql job:
SET sql-client.execution.result-mode = tableau;

create table delete_test(
  id bigint,
  PRIMARY KEY (id) NOT ENFORCED
) WITH ( 'connector' = 'upsert-kafka',
 'topic' = 'test_use',
 'properties.bootstrap.servers' = 'your broker',
 'properties.group.id' = 'your group id',
 'value.json.fail-on-missing-field' = 'false',
 'value.json.ignore-parse-errors' = 'true',
 'key.json.fail-on-missing-field' = 'false',
 'key.json.ignore-parse-errors' = 'true',
 'key.format' = 'json',
 'value.format' = 'json');
select
*
from
delete_test
;
 

3.  expect : 
 
{code:java}
++--+
| op |               id |
++--+
| -D |                    1 | {code}
 
 
actual: no output
 

> upsert-kafka can't process delete message from upsert-kafka sink
> 
>
> Key: FLINK-27663
> URL: https://issues.apache.org/jira/browse/FLINK-27663
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.15.0, 1.13.6, 1.14.4
>Reporter: Zhiwen Sun
>Priority: Major
>
> upsert-kafka write DELETE data as Kafka messages with null values (indicate 
> tombstone for the key).
> But when use upsert-kafka as a source table to consumer kafka messages write 
> by upsert-kafka sink, DELETE messages will be ignored.
>  
> related sql :
>  
>  
> {code:java}
> create table order_system_log(
>   id bigint,
>   PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
>  'connector' = 'upsert-kafka',
>  'topic' = 'test_use',
>  'properties.bootstrap.servers' = 'your broker',
>  'properties.group.id' = 'your group id',
>  'value.json.fail-on-missing-field' = 'false',
>  'value.json.ignore-parse-errors' = 'true',
>  'key.json.fail-on-missing-field' = 'false',
>  'key.json.ignore-parse-errors' = 'true',
>  'key.format' = 'json',
>  'value.format' = 'json'
> );
> select
> *
> from
> order_system_log
> ;
> {code}
>  
>  
> The problem may be produced by DeserializationSchema#deserialize,
> this method does not collect data while subclass's deserialize return null.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (FLINK-27663) upsert-kafka can't process delete message from upsert-kafka sink

2022-05-17 Thread Zhiwen Sun (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27663?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhiwen Sun updated FLINK-27663:
---
Component/s: Connectors / Kafka
 (was: Formats (JSON, Avro, Parquet, ORC, SequenceFile))

> upsert-kafka can't process delete message from upsert-kafka sink
> 
>
> Key: FLINK-27663
> URL: https://issues.apache.org/jira/browse/FLINK-27663
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.15.0, 1.13.6, 1.14.4
>Reporter: Zhiwen Sun
>Priority: Major
>
> upsert-kafka write DELETE data as Kafka messages with null values (indicate 
> tombstone for the key).
> But when use upsert-kafka as a source table to consumer kafka messages write 
> by upsert-kafka sink, DELETE messages will be ignored.
>  
> related sql :
>  
>  
> {code:java}
> create table order_system_log(
>   id bigint,
>   PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
>  'connector' = 'upsert-kafka',
>  'topic' = 'test_use',
>  'properties.bootstrap.servers' = 'your broker',
>  'properties.group.id' = 'your group id',
>  'value.json.fail-on-missing-field' = 'false',
>  'value.json.ignore-parse-errors' = 'true',
>  'key.json.fail-on-missing-field' = 'false',
>  'key.json.ignore-parse-errors' = 'true',
>  'key.format' = 'json',
>  'value.format' = 'json'
> );
> select
> *
> from
> order_system_log
> ;
> {code}
>  
>  
> The problem may be produced by DeserializationSchema#deserialize,
> this method does not collect data while subclass's deserialize return null.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink] xinbinhuang commented on pull request #17873: [FLINK-25009][CLI] Output slotSharingGroup as part of JsonGraph

2022-05-17 Thread GitBox


xinbinhuang commented on PR #17873:
URL: https://github.com/apache/flink/pull/17873#issuecomment-1129531377

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #101: [FLINK-27366] Record schema on filesystem path

2022-05-17 Thread GitBox


JingsongLi commented on code in PR #101:
URL: https://github.com/apache/flink-table-store/pull/101#discussion_r875435428


##
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java:
##
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.schema;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.file.operation.Lock;
+import org.apache.flink.table.store.file.utils.FileUtils;
+import org.apache.flink.table.store.file.utils.JsonSerdeUtil;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+
+import static 
org.apache.flink.table.store.file.utils.FileUtils.listVersionedFiles;
+
+/** Schema Manager to manage schema versions. */
+public class SchemaManager {
+
+private static final String SCHEMA_PREFIX = "schema-";
+
+private final Path tableRoot;
+
+/** Default no lock. */
+private Lock lock = Callable::call;
+
+public SchemaManager(Path tableRoot) {
+this.tableRoot = tableRoot;
+}
+
+public SchemaManager withLock(Lock lock) {
+this.lock = lock;
+return this;
+}
+
+/** @return latest schema. */
+public Optional latest() {
+try {
+return listVersionedFiles(schemaDirectory(), SCHEMA_PREFIX)
+.reduce(Math::max)
+.map(this::schema);

Review Comment:
   schema changes are a very low frequency thing (Compared to snapshot 
generation)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #101: [FLINK-27366] Record schema on filesystem path

2022-05-17 Thread GitBox


JingsongLi commented on code in PR #101:
URL: https://github.com/apache/flink-table-store/pull/101#discussion_r875435232


##
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java:
##
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.schema;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.file.operation.Lock;
+import org.apache.flink.table.store.file.utils.FileUtils;
+import org.apache.flink.table.store.file.utils.JsonSerdeUtil;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+
+import static 
org.apache.flink.table.store.file.utils.FileUtils.listVersionedFiles;
+
+/** Schema Manager to manage schema versions. */
+public class SchemaManager {
+
+private static final String SCHEMA_PREFIX = "schema-";
+
+private final Path tableRoot;
+
+/** Default no lock. */
+private Lock lock = Callable::call;
+
+public SchemaManager(Path tableRoot) {
+this.tableRoot = tableRoot;
+}
+
+public SchemaManager withLock(Lock lock) {
+this.lock = lock;
+return this;
+}
+
+/** @return latest schema. */
+public Optional latest() {
+try {
+return listVersionedFiles(schemaDirectory(), SCHEMA_PREFIX)
+.reduce(Math::max)
+.map(this::schema);

Review Comment:
   I don't think it's necessary for schema because there won't be too many 
versions of schema



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] openinx commented on pull request #117: [FLINK-27543] Hide column statistics collector inside the file format writer.

2022-05-17 Thread GitBox


openinx commented on PR #117:
URL: 
https://github.com/apache/flink-table-store/pull/117#issuecomment-1129526584

   The broken e2e unit tests is: 
   
   ```
   Error:  Tests run: 10, Failures: 2, Errors: 0, Skipped: 0, Time elapsed: 
18.257 s <<< FAILURE! - in 
org.apache.flink.table.store.file.mergetree.MergeTreeTest
   Error:  testWriteMany  Time elapsed: 5.328 s  <<< FAILURE!
   org.opentest4j.AssertionFailedError: 
   
   expected: [TestRecord{kind=ADD, k=0, v=1439555325},
   TestRecord{kind=ADD, k=2, v=-87577981},
   TestRecord{kind=ADD, k=3, v=-1671555357},
   TestRecord{kind=ADD, k=4, v=-1885526753},
   TestRecord{kind=ADD, k=6, v=-1780307938},
   TestRecord{kind=ADD, k=7, v=-1031004329},
   TestRecord{kind=ADD, k=8, v=-1545344531},
   TestRecord{kind=ADD, k=14, v=1882741690},
   TestRecord{kind=ADD, k=15, v=1399126480},
   TestRecord{kind=ADD, k=16, v=1331804956},
   TestRecord{kind=ADD, k=18, v=1642412855},
   TestRecord{kind=ADD, k=20, v=-413068575},
   TestRecord{kind=ADD, k=22, v=1865500343},
   TestRecord{kind=ADD, k=23, v=-876643231},
   TestRecord{kind=ADD, k=26, v=-351327562},
   TestRecord{kind=ADD, k=28, v=-2031549655},
   TestRecord{kind=ADD, k=30, v=-241253083},
   TestRecord{kind=ADD, k=32, v=1256513581},
   TestRecord{kind=ADD, k=33, v=-1094310640},
   TestRecord{kind=ADD, k=34, v=-206957727},
   TestRecord{kind=ADD, k=35, v=545094851},
   TestRecord{kind=ADD, k=36, v=2090965492},
   TestRecord{kind=ADD, k=37, v=-612267756},
   TestRecord{kind=ADD, k=41, v=-1668976908},
   TestRecord{kind=ADD, k=42, v=1582283169},
   TestRecord{kind=ADD, k=43, v=-1390868689},
   TestRecord{kind=ADD, k=44, v=-95457355},
   TestRecord{kind=ADD, k=46, v=-97023},
   TestRecord{kind=ADD, k=47, v=770974724},
   TestRecord{kind=ADD, k=48, v=-256528682},
   TestRecord{kind=ADD, k=49, v=871707733},
   TestRecord{kind=ADD, k=50, v=-108981698},
   TestRecord{kind=ADD, k=52, v=-1922275264},
   TestRecord{kind=ADD, k=53, v=1127951319},
   TestRecord{kind=ADD, k=55, v=1949021850},
   TestRecord{kind=ADD, k=56, v=-1780698771},
   TestRecord{kind=ADD, k=57, v=-678952533},
   TestRecord{kind=ADD, k=60, v=1786413275},
   TestRecord{kind=ADD, k=61, v=1073764052},
   TestRecord{kind=ADD, k=62, v=1119198658},
   TestRecord{kind=ADD, k=63, v=-1960775568},
   TestRecord{kind=ADD, k=64, v=1396679991},
   TestRecord{kind=ADD, k=65, v=-1823346675},
   TestRecord{kind=ADD, k=66, v=49904175},
   TestRecord{kind=ADD, k=67, v=-1213641065},
   TestRecord{kind=ADD, k=68, v=1017599222},
   TestRecord{kind=ADD, k=70, v=730983381},
   TestRecord{kind=ADD, k=71, v=632902325},
   TestRecord{kind=ADD, k=72, v=1677474125},
   TestRecord{kind=ADD, k=6322, v=-1577426721},
   TestRecord{kind=ADD, k=6713, v=477480864},
   TestRecord{kind=ADD, k=6766, v=-195595673},
   TestRecord{kind=ADD, k=6878, v=922070171},
   TestRecord{kind=ADD, k=6908, v=1279871284},
   TestRecord{kind=ADD, k=6964, v=-183467818},
   TestRecord{kind=ADD, k=7212, v=1913728452},
   TestRecord{kind=ADD, k=7277, v=777112023},
   TestRecord{kind=ADD, k=7297, v=-451539894},
   TestRecord{kind=ADD, k=7306, v=363784635},
   TestRecord{kind=ADD, k=7378, v=-1455922425},
   TestRecord{kind=ADD, k=7532, v=-2058675165},
   TestRecord{kind=ADD, k=7535, v=885332866},
   TestRecord{kind=ADD, k=7566, v=-1491380324},
   TestRecord{kind=ADD, k=7626, v=383380493},
   TestRecord{kind=ADD, k=7701, v=1901042762},
   TestRecord{kind=ADD, k=7797, v=988033575},
   TestRecord{kind=ADD, k=7972, v=-1102848478},
   TestRecord{kind=ADD, k=7984, v=1336320866},
   TestRecord{kind=ADD, k=8335, v=-166648690},
   TestRecord{kind=ADD, k=8362, v=-1958838012},
   TestRecord{kind=ADD, k=8676, v=-1305920106},
   TestRecord{kind=ADD, k=8824, v=-1534970469},
   TestRecord{kind=ADD, k=9003, v=-25406237},
   TestRecord{kind=ADD, k=9023, v=-1444326810},
   TestRecord{kind=ADD, k=9125, v=-307503062},
   TestRecord{kind=ADD, k=9144, v=-1251249360},
   TestRecord{kind=ADD, k=9291, v=158168057},
   TestRecord{kind=ADD, k=9346, v=1456276531},
   TestRecord{kind=ADD, k=9580, v=-1180486045},
   TestRecord{kind=ADD, k=9683, v=-1625710048},
   TestRecord{kind=ADD, k=9698, v=169574801},
   TestRecord{kind=ADD, k=9918, v=1835226686},
   TestRecord{kind=ADD, k=9922, v=-1687709970},
   TestRecord{kind=ADD, k=9993, v=-63074385},
   TestRecord{kind=ADD, k=9995, v=209840468},
   TestRecord{kind=ADD, k=9996, v=-1926512297},
   TestRecord{kind=ADD, k=9997, v=680790216}]
   [INFO] 
   Error:  Tests run: 410, Failures: 2, Errors: 0, Skipped: 0
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go 

[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #117: [FLINK-27543] Hide column statistics collector inside the file format writer.

2022-05-17 Thread GitBox


JingsongLi commented on code in PR #117:
URL: https://github.com/apache/flink-table-store/pull/117#discussion_r875433467


##
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java:
##
@@ -112,26 +120,17 @@ public void delete(String fileName) {
 FileUtils.deleteOrWarn(pathFactory.toManifestFilePath(fileName));
 }
 
-private class ManifestEntryBulkWriterFactory implements 
BulkWriter.Factory {
-
-@Override
-public BulkWriter create(FSDataOutputStream out) throws 
IOException {
-return new BaseBulkWriter<>(writerFactory.create(out), 
serializer::toRow);
-}
-}
-
 private class ManifestEntryWriter extends BaseFileWriter {
 
-private final FieldStatsCollector statsCollector;
-
+private final FieldStatsCollector fieldStatsCollector;

Review Comment:
   naming: `partitionStatsCollector`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] openinx closed pull request #125: [FLINK-27543] Hide column statistics collector inside the file format writer.

2022-05-17 Thread GitBox


openinx closed pull request #125: [FLINK-27543] Hide column statistics 
collector inside the file format writer.
URL: https://github.com/apache/flink-table-store/pull/125


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] openinx commented on pull request #125: [FLINK-27543] Hide column statistics collector inside the file format writer.

2022-05-17 Thread GitBox


openinx commented on PR #125:
URL: 
https://github.com/apache/flink-table-store/pull/125#issuecomment-1129525292

   Let's close this PR as we've agreed to use another one: 
https://github.com/apache/flink-table-store/pull/117 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] tsreaper commented on pull request #121: [FLINK-27626] Introduce AggregatuibMergeFunction

2022-05-17 Thread GitBox


tsreaper commented on PR #121:
URL: 
https://github.com/apache/flink-table-store/pull/121#issuecomment-1129520834

   > How should I configure `ConfigOption MERGE_ENGINE ` to get 
`columnName.aggregate-function = sum` from the parameter `FileStoreOptions` of 
`static FileStoreImpl createWithPrimaryKey` function?
   
   You can get all config options from `FileStoreOptions.options` member. You 
might need to add a method to `FileStoreOptions` to fetch out a random key.
   
   ```sql
   CREATE TABLE IF NOT EXISTS T (
 j INT, k INT, a INT, b INT, c STRING, PRIMARY KEY (j,k) NOT ENFORCED
   ) WITH ('merge-engine'='partial-update', 'specialkey'='hahahaha');
   ```
   
   
![20220518110552](https://user-images.githubusercontent.com/19909549/168949470-9d7e88e3-08a5-4666-b064-53b995081702.jpg)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] openinx commented on a diff in pull request #117: [FLINK-27543] Hide column statistics collector inside the file format writer.

2022-05-17 Thread GitBox


openinx commented on code in PR #117:
URL: https://github.com/apache/flink-table-store/pull/117#discussion_r875418990


##
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java:
##
@@ -52,23 +55,32 @@ public class ManifestFile {
 private static final Logger LOG = 
LoggerFactory.getLogger(ManifestFile.class);
 
 private final RowType partitionType;
+private final RowType entryType;
 private final ManifestEntrySerializer serializer;
+private final FieldStatsArraySerializer statsSerializer;
 private final BulkFormat readerFactory;
 private final BulkWriter.Factory writerFactory;
+private final FileStatsExtractor fileStatsExtractor;
 private final FileStorePathFactory pathFactory;
 private final long suggestedFileSize;
 
 private ManifestFile(
 RowType partitionType,
+RowType entryType,

Review Comment:
   Please see: 
https://github.com/apache/flink-table-store/pull/117/files#diff-4b9ebe9adecbf113c86f69fe4ef1c4fbb0395798eefe476e2d6f2de629e09e53R164-R166



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] openinx commented on a diff in pull request #117: [FLINK-27543] Hide column statistics collector inside the file format writer.

2022-05-17 Thread GitBox


openinx commented on code in PR #117:
URL: https://github.com/apache/flink-table-store/pull/117#discussion_r875418826


##
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java:
##
@@ -52,23 +55,32 @@ public class ManifestFile {
 private static final Logger LOG = 
LoggerFactory.getLogger(ManifestFile.class);
 
 private final RowType partitionType;
+private final RowType entryType;
 private final ManifestEntrySerializer serializer;
+private final FieldStatsArraySerializer statsSerializer;
 private final BulkFormat readerFactory;
 private final BulkWriter.Factory writerFactory;
+private final FileStatsExtractor fileStatsExtractor;
 private final FileStorePathFactory pathFactory;
 private final long suggestedFileSize;
 
 private ManifestFile(
 RowType partitionType,
+RowType entryType,

Review Comment:
   Although we don't rely on any metric which is generated from the 
`MetricFileWriter`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] openinx commented on a diff in pull request #117: [FLINK-27543] Hide column statistics collector inside the file format writer.

2022-05-17 Thread GitBox


openinx commented on code in PR #117:
URL: https://github.com/apache/flink-table-store/pull/117#discussion_r875417510


##
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java:
##
@@ -112,26 +124,22 @@ public void delete(String fileName) {
 FileUtils.deleteOrWarn(pathFactory.toManifestFilePath(fileName));
 }
 
-private class ManifestEntryBulkWriterFactory implements 
BulkWriter.Factory {
-
-@Override
-public BulkWriter create(FSDataOutputStream out) throws 
IOException {
-return new BaseBulkWriter<>(writerFactory.create(out), 
serializer::toRow);
-}
+private FileWriter.Factory 
createManifestEntryWriterFactory() {

Review Comment:
   Yes, I like this idea.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] openinx commented on a diff in pull request #117: [FLINK-27543] Hide column statistics collector inside the file format writer.

2022-05-17 Thread GitBox


openinx commented on code in PR #117:
URL: https://github.com/apache/flink-table-store/pull/117#discussion_r875417184


##
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java:
##
@@ -52,23 +55,32 @@ public class ManifestFile {
 private static final Logger LOG = 
LoggerFactory.getLogger(ManifestFile.class);
 
 private final RowType partitionType;
+private final RowType entryType;
 private final ManifestEntrySerializer serializer;
+private final FieldStatsArraySerializer statsSerializer;
 private final BulkFormat readerFactory;
 private final BulkWriter.Factory writerFactory;
+private final FileStatsExtractor fileStatsExtractor;
 private final FileStorePathFactory pathFactory;
 private final long suggestedFileSize;
 
 private ManifestFile(
 RowType partitionType,
+RowType entryType,
 ManifestEntrySerializer serializer,
+FieldStatsArraySerializer statsSerializer,

Review Comment:
   This argument can be removed now .



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] openinx commented on a diff in pull request #117: [FLINK-27543] Hide column statistics collector inside the file format writer.

2022-05-17 Thread GitBox


openinx commented on code in PR #117:
URL: https://github.com/apache/flink-table-store/pull/117#discussion_r875416965


##
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java:
##
@@ -52,23 +55,32 @@ public class ManifestFile {
 private static final Logger LOG = 
LoggerFactory.getLogger(ManifestFile.class);
 
 private final RowType partitionType;
+private final RowType entryType;
 private final ManifestEntrySerializer serializer;
+private final FieldStatsArraySerializer statsSerializer;
 private final BulkFormat readerFactory;
 private final BulkWriter.Factory writerFactory;
+private final FileStatsExtractor fileStatsExtractor;
 private final FileStorePathFactory pathFactory;
 private final long suggestedFileSize;
 
 private ManifestFile(
 RowType partitionType,
+RowType entryType,

Review Comment:
   The reason why we change this `ManifestFile` class is :  we've changed the 
`BaseFileWriter` to use the `FileWriter.Factory` ( It use 
`BulkWriter.Factory` before).  So we have to change the 
`ManifestEntryWriter` implementation so that we could still extend the 
`BaseFileWriter`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] openinx commented on a diff in pull request #117: [FLINK-27543] Hide column statistics collector inside the file format writer.

2022-05-17 Thread GitBox


openinx commented on code in PR #117:
URL: https://github.com/apache/flink-table-store/pull/117#discussion_r875414820


##
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileWriter.java:
##
@@ -237,7 +210,14 @@ public RollingKvWriter(Supplier 
writerFactory, long targetFileSize
 private Supplier createWriterFactory(int level) {
 return () -> {
 try {
-return new KvFileWriter(new KvBulkWriterFactory(), 
pathFactory.newPath(), level);
+FileWriter.Factory factory =

Review Comment:
   Yes, I agree.  It will be more clear if moving the factory initialization to 
the constructor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-27008) Document the configurable parameters of the helm chart and their default values

2022-05-17 Thread Yang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27008?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yang Wang reassigned FLINK-27008:
-

Assignee: Nicholas Jiang

> Document the configurable parameters of the helm chart and their default 
> values
> ---
>
> Key: FLINK-27008
> URL: https://issues.apache.org/jira/browse/FLINK-27008
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kubernetes Operator
>Reporter: Yang Wang
>Assignee: Nicholas Jiang
>Priority: Major
>  Labels: starter
> Fix For: kubernetes-operator-1.0.0
>
>
> We might need a table to document all the configurable parameters of the helm 
> chart and their default values. It could be put in the helm section[1].
> ||Parameters||Description||Default Value||
> |watchNamespaces|List of kubernetes namespaces to watch for FlinkDeployment 
> changes, empty means all namespaces.| |
> |image.repository|The image repository of 
> flink-kubernetes-operator|ghcr.io/apache/flink-kubernetes-operator|
> |...|...| |
> [1]. 
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/helm/



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink-ml] yunfengzhou-hub commented on a diff in pull request #100: [FLINK-27096] Add LabeledPointWithWeightGenerator and Benchmark Configuration for KMeans and NaiveBayes

2022-05-17 Thread GitBox


yunfengzhou-hub commented on code in PR #100:
URL: https://github.com/apache/flink-ml/pull/100#discussion_r875396471


##
flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/datagenerator/common/LabeledPointWithWeightGenerator.java:
##
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.benchmark.datagenerator.common;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.benchmark.datagenerator.InputDataGenerator;
+import org.apache.flink.ml.benchmark.datagenerator.param.HasVectorDim;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.common.param.HasFeaturesCol;
+import org.apache.flink.ml.common.param.HasLabelCol;
+import org.apache.flink.ml.common.param.HasWeightCol;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.linalg.typeinfo.DenseVectorTypeInfo;
+import org.apache.flink.ml.param.IntParam;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidators;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.NumberSequenceIterator;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+/** A DataGenerator which creates a table of features, label and weight. */
+public class LabeledPointWithWeightGenerator
+implements InputDataGenerator,
+HasFeaturesCol,
+HasLabelCol,
+HasWeightCol,
+HasVectorDim {
+
+public static final Param FEATURE_ARITY =
+new IntParam(
+"featureArity",
+"Arity of each feature. "
++ "If set to positive value, each feature would be 
an integer in range [0, arity - 1]. "
++ "If set to zero, each feature would be a 
continuous double in range [0, 1).",
+2,
+ParamValidators.gtEq(0));
+
+public static final Param LABEL_ARITY =
+new IntParam(
+"labelArity",
+"Arity of label. "
++ "If set to positive value, the label would be an 
integer in range [0, arity - 1]. "
++ "If set to zero, the label would be a continuous 
double in range [0, 1).",
+2,
+ParamValidators.gtEq(0));
+
+public static final Param WEIGHT_ARITY =
+new IntParam(
+"weightArity",
+"Arity of weight. "
++ "If set to positive value, the weight would be 
an integer in range [1, arity]. "
++ "If set to zero, weight would be a continuous 
double in range [0, 1).",
+1,
+ParamValidators.gtEq(0));
+
+private final Map, Object> paramMap = new HashMap<>();
+
+public LabeledPointWithWeightGenerator() {
+ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+}
+
+public int getFeatureArity() {
+return get(FEATURE_ARITY);
+}
+
+public LabeledPointWithWeightGenerator setFeatureArity(int value) {
+return set(FEATURE_ARITY, value);
+}
+
+public int getLabelArity() {
+return get(LABEL_ARITY);
+}
+
+public LabeledPointWithWeightGenerator setLabelArity(int value) {
+return set(LABEL_ARITY, value);
+}
+
+public int getWeightArity() {
+return get(WEIGHT_ARITY);
+}
+
+public LabeledPointWithWeightGenerator 

[jira] [Updated] (FLINK-27670) Python wrappers for Kinesis Sinks

2022-05-17 Thread Huang Xingbo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27670?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Huang Xingbo updated FLINK-27670:
-
Fix Version/s: 1.16.0
   (was: 1.15.1)

> Python wrappers for Kinesis Sinks
> -
>
> Key: FLINK-27670
> URL: https://issues.apache.org/jira/browse/FLINK-27670
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python, Connectors / Kinesis
>Reporter: Zichen Liu
>Priority: Major
> Fix For: 1.16.0
>
>
> Create Python Wrappers for the new Kinesis Streams sink and the Kinesis 
> Firehose sink.
> An example implementation may be found here 
> [https://github.com/apache/flink/pull/15491/files] for the old Kinesis sink.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (FLINK-27670) Python wrappers for Kinesis Sinks

2022-05-17 Thread Huang Xingbo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27670?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Huang Xingbo updated FLINK-27670:
-
Component/s: API / Python

> Python wrappers for Kinesis Sinks
> -
>
> Key: FLINK-27670
> URL: https://issues.apache.org/jira/browse/FLINK-27670
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python, Connectors / Kinesis
>Reporter: Zichen Liu
>Priority: Major
> Fix For: 1.15.1
>
>
> Create Python Wrappers for the new Kinesis Streams sink and the Kinesis 
> Firehose sink.
> An example implementation may be found here 
> [https://github.com/apache/flink/pull/15491/files] for the old Kinesis sink.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27674) Elasticsearch6SinkE2ECase test hangs

2022-05-17 Thread Huang Xingbo (Jira)
Huang Xingbo created FLINK-27674:


 Summary: Elasticsearch6SinkE2ECase test hangs
 Key: FLINK-27674
 URL: https://issues.apache.org/jira/browse/FLINK-27674
 Project: Flink
  Issue Type: Bug
  Components: Connectors / ElasticSearch
Affects Versions: 1.16.0
Reporter: Huang Xingbo



{code:java}
2022-05-17T14:23:20.2220667Z May 17 14:23:20 [INFO] Running 
org.apache.flink.streaming.tests.Elasticsearch6SinkE2ECase
2022-05-17T16:46:42.2757438Z 
==
2022-05-17T16:46:42.2769104Z === WARNING: This task took already 95% of the 
available time budget of 284 minutes ===
2022-05-17T16:46:42.2769752Z 
==
2022-05-17T16:46:42.2777415Z 
==
2022-05-17T16:46:42.2778082Z The following Java processes are running (JPS)
2022-05-17T16:46:42.2778711Z 
==
2022-05-17T16:46:42.4370838Z 366642 surefirebooter7296810110127514313.jar
2022-05-17T16:46:42.4371962Z 323278 Launcher
2022-05-17T16:46:42.4378950Z 384312 Jps
2022-05-17T16:46:42.4452836Z 
==
2022-05-17T16:46:42.4453843Z Printing stack trace of Java process 366642
2022-05-17T16:46:42.4454796Z 
==
2022-05-17T16:46:42.9967200Z 2022-05-17 16:46:42
2022-05-17T16:46:42.9968158Z Full thread dump OpenJDK 64-Bit Server VM 
(25.332-b09 mixed mode):
2022-05-17T16:46:42.9968402Z 
2022-05-17T16:46:42.9968953Z "Attach Listener" #6017 daemon prio=9 os_prio=0 
tid=0x7f4cf0007000 nid=0x5dd53 waiting on condition [0x]
2022-05-17T16:46:42.9969533Zjava.lang.Thread.State: RUNNABLE
2022-05-17T16:46:42.9969714Z 
2022-05-17T16:46:42.9970494Z "ForkJoinPool-1-worker-0" #6011 daemon prio=5 
os_prio=0 tid=0x7f4b4ed92800 nid=0x5cfea waiting on condition 
[0x7f4b35b9b000]
2022-05-17T16:46:42.9971118Zjava.lang.Thread.State: TIMED_WAITING (parking)
2022-05-17T16:46:42.9971752Zat sun.misc.Unsafe.park(Native Method)
2022-05-17T16:46:42.9972494Z- parking to wait for  <0x939d6cd0> (a 
java.util.concurrent.ForkJoinPool)
2022-05-17T16:46:42.9973134Zat 
java.util.concurrent.ForkJoinPool.awaitWork(ForkJoinPool.java:1824)
2022-05-17T16:46:42.9973801Zat 
java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1693)
2022-05-17T16:46:43.0113835Zat 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
2022-05-17T16:46:43.0114433Z 
2022-05-17T16:46:43.0118656Z "flink-rest-client-netty-thread-1" #6006 daemon 
prio=5 os_prio=0 tid=0x7f4b4ed8f000 nid=0x5bb03 runnable 
[0x7f4b34181000]
2022-05-17T16:46:43.0119406Zjava.lang.Thread.State: RUNNABLE
2022-05-17T16:46:43.0120063Zat 
sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
2022-05-17T16:46:43.0120921Zat 
sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
2022-05-17T16:46:43.0121666Zat 
sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
2022-05-17T16:46:43.0122507Zat 
sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
2022-05-17T16:46:43.0123735Z- locked <0xe4e90c80> (a 
org.apache.flink.shaded.netty4.io.netty.channel.nio.SelectedSelectionKeySet)
2022-05-17T16:46:43.0124873Z- locked <0xe4e90a90> (a 
java.util.Collections$UnmodifiableSet)
2022-05-17T16:46:43.0125863Z- locked <0xe4e909b8> (a 
sun.nio.ch.EPollSelectorImpl)
2022-05-17T16:46:43.0126648Zat 
sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
2022-05-17T16:46:43.0127370Zat 
sun.nio.ch.SelectorImpl.select(SelectorImpl.java:101)
2022-05-17T16:46:43.0128211Zat 
org.apache.flink.shaded.netty4.io.netty.channel.nio.SelectedSelectionKeySetSelector.select(SelectedSelectionKeySetSelector.java:68)
2022-05-17T16:46:43.0129260Zat 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.select(NioEventLoop.java:810)
2022-05-17T16:46:43.0130119Zat 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:457)
2022-05-17T16:46:43.0131134Zat 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
2022-05-17T16:46:43.0137869Zat 
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
2022-05-17T16:46:43.0138683Zat java.lang.Thread.run(Thread.java:750)
{code}

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=35749=logs=af184cdd-c6d8-5084-0b69-7e9c67b35f7a=160c9ae5-96fd-516e-1c91-deb81f59292a



--
This message was sent by Atlassian Jira

[jira] [Comment Edited] (FLINK-27257) Flink kubernetes operator triggers savepoint failed because of not all tasks running

2022-05-17 Thread Nicholas Jiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538539#comment-17538539
 ] 

Nicholas Jiang edited comment on FLINK-27257 at 5/18/22 1:44 AM:
-

I have offline discussed with [~wangyang0918]. It's hard to improve the 
isJobRunning implementation because the returned JobStatusMessages of 
ClusterClient#listJobs() don't contain the tasksPerState of the JobDetail, 
which cause that there is no way to judge whether all the ExecutionState of 
tasks are running in the job and for batch tasks how to judge whether the job 
is really running.

The current idea is that if the error is found to be not all required tasks are 
currently running, then continue to trigger savepoint in the next 
reconciliation until it is successfully triggered.

[~gyfora][~wangyang0918] WDYT?


was (Author: nicholasjiang):
I have offline discussed with [~wangyang0918]. It's hard to improve the 
isJobRunning implementation because the returned JobStatusMessages of 
ClusterClient#listJobs() don't contain the tasksPerState of the JobDetail, 
which cause that there is no way to judge whether all the ExecutionState of 
tasks are running in the job and for batch tasks how to judge whether the job 
is really running.

The current idea is that if the error is found to be Not all required tasks are 
currently running, then continue to trigger savepoint in the next 
reconciliation until it is successfully triggered.

[~gyfora][~wangyang0918] WDYT?

> Flink kubernetes operator triggers savepoint failed because of not all tasks 
> running
> 
>
> Key: FLINK-27257
> URL: https://issues.apache.org/jira/browse/FLINK-27257
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Reporter: Yang Wang
>Assignee: Nicholas Jiang
>Priority: Major
> Fix For: kubernetes-operator-1.1.0
>
>
> {code:java}
> 2022-04-15 02:38:56,551 o.a.f.k.o.s.FlinkService       [INFO 
> ][default/flink-example-statemachine] Fetching savepoint result with 
> triggerId: 182d7f176496856d7b33fe2f3767da18
> 2022-04-15 02:38:56,690 o.a.f.k.o.s.FlinkService       
> [ERROR][default/flink-example-statemachine] Savepoint error
> org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint 
> triggering task Source: Custom Source (1/2) of job 
>  is not being executed at the moment. 
> Aborting checkpoint. Failure reason: Not all required tasks are currently 
> running.
>     at 
> org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.checkTasksStarted(DefaultCheckpointPlanCalculator.java:143)
>     at 
> org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.lambda$calculateCheckpointPlan$1(DefaultCheckpointPlanCalculator.java:105)
>     at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
>     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:455)
>     at 
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
>     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:455)
>     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213)
>     at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
>     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
>     at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>     at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>     at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>     at akka.actor.Actor.aroundReceive(Actor.scala:537)
>     at akka.actor.Actor.aroundReceive$(Actor.scala:535)
>     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
>     at akka.actor.ActorCell.invoke(ActorCell.scala:548)
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
>     at akka.dispatch.Mailbox.run(Mailbox.scala:231)
>     at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
>     at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
>     at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
>     at 

[jira] [Commented] (FLINK-27257) Flink kubernetes operator triggers savepoint failed because of not all tasks running

2022-05-17 Thread Nicholas Jiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538539#comment-17538539
 ] 

Nicholas Jiang commented on FLINK-27257:


I have offline discussed with [~wangyang0918]. It's hard to improve the 
isJobRunning implementation because the returned JobStatusMessages of 
ClusterClient#listJobs() don't contain the tasksPerState of the JobDetail, 
which cause that there is no way to judge whether all the ExecutionState of 
tasks are running in the job and for batch tasks how to judge whether the job 
is really running.

The current idea is that if the error is found to be Not all required tasks are 
currently running, then continue to trigger savepoint in the next 
reconciliation until it is successfully triggered.

[~gyfora][~wangyang0918] WDYT?

> Flink kubernetes operator triggers savepoint failed because of not all tasks 
> running
> 
>
> Key: FLINK-27257
> URL: https://issues.apache.org/jira/browse/FLINK-27257
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Reporter: Yang Wang
>Assignee: Nicholas Jiang
>Priority: Major
> Fix For: kubernetes-operator-1.1.0
>
>
> {code:java}
> 2022-04-15 02:38:56,551 o.a.f.k.o.s.FlinkService       [INFO 
> ][default/flink-example-statemachine] Fetching savepoint result with 
> triggerId: 182d7f176496856d7b33fe2f3767da18
> 2022-04-15 02:38:56,690 o.a.f.k.o.s.FlinkService       
> [ERROR][default/flink-example-statemachine] Savepoint error
> org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint 
> triggering task Source: Custom Source (1/2) of job 
>  is not being executed at the moment. 
> Aborting checkpoint. Failure reason: Not all required tasks are currently 
> running.
>     at 
> org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.checkTasksStarted(DefaultCheckpointPlanCalculator.java:143)
>     at 
> org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.lambda$calculateCheckpointPlan$1(DefaultCheckpointPlanCalculator.java:105)
>     at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
>     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:455)
>     at 
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
>     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:455)
>     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213)
>     at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
>     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
>     at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>     at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>     at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>     at akka.actor.Actor.aroundReceive(Actor.scala:537)
>     at akka.actor.Actor.aroundReceive$(Actor.scala:535)
>     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
>     at akka.actor.ActorCell.invoke(ActorCell.scala:548)
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
>     at akka.dispatch.Mailbox.run(Mailbox.scala:231)
>     at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
>     at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
>     at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
>     at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
>     at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
> 2022-04-15 02:38:56,693 o.a.f.k.o.o.SavepointObserver  
> [ERROR][default/flink-example-statemachine] Checkpoint triggering task 
> Source: Custom Source (1/2) of job  is not 
> being executed at the moment. Aborting checkpoint. Failure reason: Not all 
> required tasks are currently running. {code}
> How to reproduce?
> Update arbitrary fields(e.g. parallelism) along with 
> {{{}savepointTriggerNonce{}}}.
>  
> The root cause might be the running state return by 
> {{ClusterClient#listJobs()}} does not mean all the 

[GitHub] [flink-ml] yunfengzhou-hub commented on a diff in pull request #100: [FLINK-27096] Add LabeledPointWithWeightGenerator and Benchmark Configuration for KMeans and NaiveBayes

2022-05-17 Thread GitBox


yunfengzhou-hub commented on code in PR #100:
URL: https://github.com/apache/flink-ml/pull/100#discussion_r875396471


##
flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/datagenerator/common/LabeledPointWithWeightGenerator.java:
##
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.benchmark.datagenerator.common;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.benchmark.datagenerator.InputDataGenerator;
+import org.apache.flink.ml.benchmark.datagenerator.param.HasVectorDim;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.common.param.HasFeaturesCol;
+import org.apache.flink.ml.common.param.HasLabelCol;
+import org.apache.flink.ml.common.param.HasWeightCol;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.linalg.typeinfo.DenseVectorTypeInfo;
+import org.apache.flink.ml.param.IntParam;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidators;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.NumberSequenceIterator;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+/** A DataGenerator which creates a table of features, label and weight. */
+public class LabeledPointWithWeightGenerator
+implements InputDataGenerator,
+HasFeaturesCol,
+HasLabelCol,
+HasWeightCol,
+HasVectorDim {
+
+public static final Param FEATURE_ARITY =
+new IntParam(
+"featureArity",
+"Arity of each feature. "
++ "If set to positive value, each feature would be 
an integer in range [0, arity - 1]. "
++ "If set to zero, each feature would be a 
continuous double in range [0, 1).",
+2,
+ParamValidators.gtEq(0));
+
+public static final Param LABEL_ARITY =
+new IntParam(
+"labelArity",
+"Arity of label. "
++ "If set to positive value, the label would be an 
integer in range [0, arity - 1]. "
++ "If set to zero, the label would be a continuous 
double in range [0, 1).",
+2,
+ParamValidators.gtEq(0));
+
+public static final Param WEIGHT_ARITY =
+new IntParam(
+"weightArity",
+"Arity of weight. "
++ "If set to positive value, the weight would be 
an integer in range [1, arity]. "
++ "If set to zero, weight would be a continuous 
double in range [0, 1).",
+1,
+ParamValidators.gtEq(0));
+
+private final Map, Object> paramMap = new HashMap<>();
+
+public LabeledPointWithWeightGenerator() {
+ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+}
+
+public int getFeatureArity() {
+return get(FEATURE_ARITY);
+}
+
+public LabeledPointWithWeightGenerator setFeatureArity(int value) {
+return set(FEATURE_ARITY, value);
+}
+
+public int getLabelArity() {
+return get(LABEL_ARITY);
+}
+
+public LabeledPointWithWeightGenerator setLabelArity(int value) {
+return set(LABEL_ARITY, value);
+}
+
+public int getWeightArity() {
+return get(WEIGHT_ARITY);
+}
+
+public LabeledPointWithWeightGenerator 

[jira] (FLINK-27257) Flink kubernetes operator triggers savepoint failed because of not all tasks running

2022-05-17 Thread Nicholas Jiang (Jira)


[ https://issues.apache.org/jira/browse/FLINK-27257 ]


Nicholas Jiang deleted comment on FLINK-27257:


was (Author: nicholasjiang):
[~gyfora], sorry for later reply. I will push a pull request today.

> Flink kubernetes operator triggers savepoint failed because of not all tasks 
> running
> 
>
> Key: FLINK-27257
> URL: https://issues.apache.org/jira/browse/FLINK-27257
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Reporter: Yang Wang
>Assignee: Nicholas Jiang
>Priority: Major
> Fix For: kubernetes-operator-1.1.0
>
>
> {code:java}
> 2022-04-15 02:38:56,551 o.a.f.k.o.s.FlinkService       [INFO 
> ][default/flink-example-statemachine] Fetching savepoint result with 
> triggerId: 182d7f176496856d7b33fe2f3767da18
> 2022-04-15 02:38:56,690 o.a.f.k.o.s.FlinkService       
> [ERROR][default/flink-example-statemachine] Savepoint error
> org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint 
> triggering task Source: Custom Source (1/2) of job 
>  is not being executed at the moment. 
> Aborting checkpoint. Failure reason: Not all required tasks are currently 
> running.
>     at 
> org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.checkTasksStarted(DefaultCheckpointPlanCalculator.java:143)
>     at 
> org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.lambda$calculateCheckpointPlan$1(DefaultCheckpointPlanCalculator.java:105)
>     at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
>     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:455)
>     at 
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
>     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:455)
>     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213)
>     at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
>     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
>     at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>     at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>     at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>     at akka.actor.Actor.aroundReceive(Actor.scala:537)
>     at akka.actor.Actor.aroundReceive$(Actor.scala:535)
>     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
>     at akka.actor.ActorCell.invoke(ActorCell.scala:548)
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
>     at akka.dispatch.Mailbox.run(Mailbox.scala:231)
>     at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
>     at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
>     at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
>     at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
>     at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
> 2022-04-15 02:38:56,693 o.a.f.k.o.o.SavepointObserver  
> [ERROR][default/flink-example-statemachine] Checkpoint triggering task 
> Source: Custom Source (1/2) of job  is not 
> being executed at the moment. Aborting checkpoint. Failure reason: Not all 
> required tasks are currently running. {code}
> How to reproduce?
> Update arbitrary fields(e.g. parallelism) along with 
> {{{}savepointTriggerNonce{}}}.
>  
> The root cause might be the running state return by 
> {{ClusterClient#listJobs()}} does not mean all the tasks are running.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink-ml] yunfengzhou-hub commented on a diff in pull request #100: [FLINK-27096] Add LabeledPointWithWeightGenerator and Benchmark Configuration for KMeans and NaiveBayes

2022-05-17 Thread GitBox


yunfengzhou-hub commented on code in PR #100:
URL: https://github.com/apache/flink-ml/pull/100#discussion_r875393271


##
flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/datagenerator/common/LabeledPointWithWeightGenerator.java:
##
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.benchmark.datagenerator.common;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.benchmark.datagenerator.InputDataGenerator;
+import org.apache.flink.ml.benchmark.datagenerator.param.HasVectorDim;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.common.param.HasFeaturesCol;
+import org.apache.flink.ml.common.param.HasLabelCol;
+import org.apache.flink.ml.common.param.HasWeightCol;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.linalg.typeinfo.DenseVectorTypeInfo;
+import org.apache.flink.ml.param.IntParam;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidators;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.NumberSequenceIterator;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+/** A DataGenerator which creates a table of features, label and weight. */
+public class LabeledPointWithWeightGenerator
+implements InputDataGenerator,
+HasFeaturesCol,
+HasLabelCol,
+HasWeightCol,
+HasVectorDim {
+
+public static final Param FEATURE_ARITY =
+new IntParam(
+"featureArity",
+"Arity of each feature. "
++ "If set to positive value, each feature would be 
an integer in range [0, arity - 1]. "
++ "If set to zero, each feature would be a 
continuous double in range [0, 1).",
+2,
+ParamValidators.gtEq(0));
+
+public static final Param LABEL_ARITY =
+new IntParam(
+"labelArity",
+"Arity of label. "
++ "If set to positive value, the label would be an 
integer in range [0, arity - 1]. "
++ "If set to zero, the label would be a continuous 
double in range [0, 1).",
+2,
+ParamValidators.gtEq(0));
+
+public static final Param WEIGHT_ARITY =
+new IntParam(
+"weightArity",
+"Arity of weight. "
++ "If set to positive value, the weight would be 
an integer in range [1, arity]. "
++ "If set to zero, weight would be a continuous 
double in range [0, 1).",
+1,
+ParamValidators.gtEq(0));
+
+private final Map, Object> paramMap = new HashMap<>();
+
+public LabeledPointWithWeightGenerator() {
+ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+}
+
+public int getFeatureArity() {
+return get(FEATURE_ARITY);
+}
+
+public LabeledPointWithWeightGenerator setFeatureArity(int value) {
+return set(FEATURE_ARITY, value);
+}
+
+public int getLabelArity() {
+return get(LABEL_ARITY);
+}
+
+public LabeledPointWithWeightGenerator setLabelArity(int value) {
+return set(LABEL_ARITY, value);
+}
+
+public int getWeightArity() {
+return get(WEIGHT_ARITY);
+}
+
+public LabeledPointWithWeightGenerator 

[GitHub] [flink-ml] yunfengzhou-hub commented on a diff in pull request #100: [FLINK-27096] Add LabeledPointWithWeightGenerator and Benchmark Configuration for KMeans and NaiveBayes

2022-05-17 Thread GitBox


yunfengzhou-hub commented on code in PR #100:
URL: https://github.com/apache/flink-ml/pull/100#discussion_r875392599


##
flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/datagenerator/common/LabeledPointWithWeightGenerator.java:
##
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.benchmark.datagenerator.common;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.benchmark.datagenerator.InputDataGenerator;
+import org.apache.flink.ml.benchmark.datagenerator.param.HasVectorDim;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.common.param.HasFeaturesCol;
+import org.apache.flink.ml.common.param.HasLabelCol;
+import org.apache.flink.ml.common.param.HasWeightCol;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.linalg.typeinfo.DenseVectorTypeInfo;
+import org.apache.flink.ml.param.IntParam;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidators;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.NumberSequenceIterator;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+/** A DataGenerator which creates a table of features, label and weight. */
+public class LabeledPointWithWeightGenerator
+implements InputDataGenerator,
+HasFeaturesCol,
+HasLabelCol,
+HasWeightCol,
+HasVectorDim {
+
+public static final Param FEATURE_ARITY =
+new IntParam(
+"featureArity",
+"Arity of each feature. "
++ "If set to positive value, each feature would be 
an integer in range [0, arity - 1]. "
++ "If set to zero, each feature would be a 
continuous double in range [0, 1).",
+2,
+ParamValidators.gtEq(0));
+
+public static final Param LABEL_ARITY =
+new IntParam(
+"labelArity",
+"Arity of label. "
++ "If set to positive value, the label would be an 
integer in range [0, arity - 1]. "
++ "If set to zero, the label would be a continuous 
double in range [0, 1).",
+2,
+ParamValidators.gtEq(0));
+
+public static final Param WEIGHT_ARITY =

Review Comment:
   I agree. I'll make the change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-26914) Notice files improvements in flink-kubernetes-operator

2022-05-17 Thread Yang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yang Wang reassigned FLINK-26914:
-

Assignee: Yang Wang

> Notice files improvements in flink-kubernetes-operator
> --
>
> Key: FLINK-26914
> URL: https://issues.apache.org/jira/browse/FLINK-26914
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Yang Wang
>Assignee: Yang Wang
>Priority: Major
>  Labels: starter
> Fix For: kubernetes-operator-1.0.0
>
>
> See the discussion in this PR[1].
> Having the jars contain the NOTICE files is insufficient as discoverability 
> within the image is basically 0. We should extract & merge the NOTICE files 
> for all jars, similarly to what we do for {{{}flink-dist{}}}.
>  
> Another small improvement is flink dependencies could be removed from 
> {{{}flink-kubernetes-operator/resources/META-INF/NOTICE{}}}.
>  
> [1]. https://github.com/apache/flink-kubernetes-operator/pull/127



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink] snuyanzin commented on pull request #19754: [FLINK-27673][tests] Migrate flink-table-api-scala to JUnit5

2022-05-17 Thread GitBox


snuyanzin commented on PR #19754:
URL: https://github.com/apache/flink/pull/19754#issuecomment-1129451355

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on pull request #19753: [FLINK-27672][tests][table] Migrate flink-table-common to JUnit5

2022-05-17 Thread GitBox


snuyanzin commented on PR #19753:
URL: https://github.com/apache/flink/pull/19753#issuecomment-1129436869

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] xinbinhuang commented on pull request #17873: [FLINK-25009][CLI] Output slotSharingGroup as part of JsonGraph

2022-05-17 Thread GitBox


xinbinhuang commented on PR #17873:
URL: https://github.com/apache/flink/pull/17873#issuecomment-1129364011

   @KarmaGYZ sorry for getting back to this so late. Finally got some time sort 
out my laptop's build setup. Can you take a look at the PR again? thx


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] xinbinhuang commented on pull request #17873: [FLINK-25009][CLI] Output slotSharingGroup as part of JsonGraph

2022-05-17 Thread GitBox


xinbinhuang commented on PR #17873:
URL: https://github.com/apache/flink/pull/17873#issuecomment-1129363293

   @flinkbot run azure
   
   looks like the CI is stuck? (`Build Flink` stage last for > 4 hrs 
`e2c_ci_test2`.)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #19716: [FLINK-27607][tests] Migrate module flink-connector-files to JUnit5

2022-05-17 Thread GitBox


snuyanzin commented on code in PR #19716:
URL: https://github.com/apache/flink/pull/19716#discussion_r875296706


##
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkCompactionSwitchITCase.java:
##
@@ -108,55 +104,56 @@ public class FileSinkCompactionSwitchITCase extends 
TestLogger {
 
 protected static final int NUM_BUCKETS = 4;
 
-@ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+@TempDir private static java.nio.file.Path tmpDir;
 
-@Rule public final SharedObjects sharedObjects = SharedObjects.create();
+@RegisterExtension final SharedObjectsExtension sharedObjects = 
SharedObjectsExtension.create();

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #19716: [FLINK-27607][tests] Migrate module flink-connector-files to JUnit5

2022-05-17 Thread GitBox


snuyanzin commented on code in PR #19716:
URL: https://github.com/apache/flink/pull/19716#discussion_r875295088


##
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/PartitionWriterTest.java:
##
@@ -23,26 +23,39 @@
 import org.apache.flink.connector.file.table.PartitionWriter.Context;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.utils.LegacyRowResource;
 import org.apache.flink.types.Row;
+import org.apache.flink.types.RowUtils;
 
-import org.junit.Assert;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
 /** Test for {@link PartitionWriter}s. */
-public class PartitionWriterTest {
+class PartitionWriterTest {
+
+@TempDir private java.nio.file.Path tmpDir;
 
-@Rule public final LegacyRowResource usesLegacyRows = 
LegacyRowResource.INSTANCE;

Review Comment:
   ok, thanks, 
   I've made these methods public and started to call them



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #19716: [FLINK-27607][tests] Migrate module flink-connector-files to JUnit5

2022-05-17 Thread GitBox


snuyanzin commented on code in PR #19716:
URL: https://github.com/apache/flink/pull/19716#discussion_r875294629


##
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java:
##
@@ -147,39 +145,40 @@ public void testStaticPartition() throws Exception {
 testHarness.processElement(new StreamRecord<>(Row.of("a2", 2), 
1L));
 testHarness.processElement(new StreamRecord<>(Row.of("a2", 2), 
1L));
 testHarness.processElement(new StreamRecord<>(Row.of("a3", 3), 
1L));
-assertEquals(1, getFileContentByPath(tmpFile).size());
+assertThat(getFileContentByPath(tmpPath)).hasSize(1);
 }
 
 ref.get().finalizeGlobal(1);
-Map content = getFileContentByPath(outputFile);
-assertEquals(1, content.size());
-assertEquals("c=p1", 
content.keySet().iterator().next().getParentFile().getName());
-assertEquals("a1,1\n" + "a2,2\n" + "a2,2\n" + "a3,3\n", 
content.values().iterator().next());
-assertFalse(new File(tmpFile.toURI()).exists());
+Map content = getFileContentByPath(outputPath);
+assertThat(content).hasSize(1);
+
assertThat(content.keySet().iterator().next().getParentFile().getName()).isEqualTo("c=p1");
+assertThat(content.values().iterator().next())
+.isEqualTo("a1,1\n" + "a2,2\n" + "a2,2\n" + "a3,3\n");
+assertThat(new File(tmpPath.toUri())).doesNotExist();
 }
 
 @Test
-public void testDynamicPartition() throws Exception {
+void testDynamicPartition() throws Exception {
 AtomicReference> ref = new 
AtomicReference<>();
 try (OneInputStreamOperatorTestHarness testHarness =
 createSink(false, true, false, new LinkedHashMap<>(), ref)) {
 writeUnorderedRecords(testHarness);
-assertEquals(2, getFileContentByPath(tmpFile).size());
+assertThat(getFileContentByPath(tmpPath)).hasSize(2);
 }
 
 ref.get().finalizeGlobal(1);
-Map content = getFileContentByPath(outputFile);
+Map content = getFileContentByPath(outputPath);
 Map sortedContent = new TreeMap<>();
 content.forEach((file, s) -> 
sortedContent.put(file.getParentFile().getName(), s));
 
-assertEquals(2, sortedContent.size());
-assertEquals("a1,1\n" + "a2,2\n" + "a3,3\n", 
sortedContent.get("c=p1"));
-assertEquals("a2,2\n", sortedContent.get("c=p2"));
-assertFalse(new File(tmpFile.toURI()).exists());
+assertThat(sortedContent).hasSize(2);
+assertThat(sortedContent.get("c=p1")).isEqualTo("a1,1\n" + "a2,2\n" + 
"a3,3\n");
+assertThat(sortedContent.get("c=p2")).isEqualTo("a2,2\n");

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #19754: [FLINK-27673][tests] Migrate flink-table-api-scala to JUnit5

2022-05-17 Thread GitBox


flinkbot commented on PR #19754:
URL: https://github.com/apache/flink/pull/19754#issuecomment-1129356861

   
   ## CI report:
   
   * 3c204d8b0a0493bfe50633bdb22b68ba76fe6c84 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #19716: [FLINK-27607][tests] Migrate module flink-connector-files to JUnit5

2022-05-17 Thread GitBox


snuyanzin commented on code in PR #19716:
URL: https://github.com/apache/flink/pull/19716#discussion_r875293489


##
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkITBase.java:
##
@@ -47,20 +44,17 @@ public abstract class FileSinkITBase extends TestLogger {
 
 protected static final double FAILOVER_RATIO = 0.4;
 
-@ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
-
-@Parameterized.Parameter public boolean triggerFailover;
-
-@Parameterized.Parameters(name = "triggerFailover = {0}")
-public static Collection params() {
-return Arrays.asList(new Object[] {false}, new Object[] {true});
+public static Stream params() {

Review Comment:
   yep, done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #19716: [FLINK-27607][tests] Migrate module flink-connector-files to JUnit5

2022-05-17 Thread GitBox


snuyanzin commented on code in PR #19716:
URL: https://github.com/apache/flink/pull/19716#discussion_r875293274


##
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileSinkMigrationITCase.java:
##
@@ -58,17 +56,15 @@
 import java.util.stream.LongStream;
 
 import static 
org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning;
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /**
  * Tests migrating from {@link StreamingFileSink} to {@link FileSink}. It 
trigger a savepoint for
  * the {@link StreamingFileSink} job and restore the {@link FileSink} job from 
the savepoint taken.
  */
-public class FileSinkMigrationITCase extends TestLogger {
+class FileSinkMigrationITCase {
 
-@ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
-
-@Rule public final SharedObjects sharedObjects = SharedObjects.create();
+@RegisterExtension final SharedObjectsExtension sharedObjects = 
SharedObjectsExtension.create();

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-27673) [JUnit5 Migration] Module: flink-table-api-scala

2022-05-17 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27673?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-27673:
---
Labels: pull-request-available  (was: )

> [JUnit5 Migration] Module: flink-table-api-scala
> 
>
> Key: FLINK-27673
> URL: https://issues.apache.org/jira/browse/FLINK-27673
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API, Tests
>Reporter: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink] snuyanzin opened a new pull request, #19754: [FLINK-27673][tests] Migrate flink-table-api-scala to JUnit5

2022-05-17 Thread GitBox


snuyanzin opened a new pull request, #19754:
URL: https://github.com/apache/flink/pull/19754

   ## What is the purpose of the change
   
   Update the flink-table-api-scala module to AssertJ and JUnit 5 following the 
[JUnit 5 Migration 
Guide](https://docs.google.com/document/d/1514Wa_aNB9bJUen4xm5uiuXOooOJTtXqS_Jqk9KJitU/edit)
   
   ## Brief change log
   
   Use JUnit5 and AssertJ in tests instead of JUnit4 and Hamcrest
   
   ## Verifying this change
   
   This change is a code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): ( no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: ( no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: ( no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? ( no)
 - If yes, how is the feature documented? (not applicable)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

2022-05-17 Thread GitBox


rkhachatryan commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r875289701


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java:
##
@@ -227,6 +229,14 @@ private static void registerSharedState(
 for (KeyedStateHandle stateHandle : stateHandles) {
 if (stateHandle != null) {
 stateHandle.registerSharedStates(sharedStateRegistry, 
checkpointID);
+// Registering state handle to the given sharedStateRegistry 
serves two purposes:
+// 1. let sharedStateRegistry be responsible for cleaning the 
state handle,

Review Comment:
   The ID is formally necessary to implement `equals` (`hashCode`). Otherwise, 
each time this ID is used, `SharedStateRegistry` will try to discard it.
   Discard is a no-op, but currently deletions are scheduled one-by-one to the 
IO thread pool. So it might delay other IO tasks and/or lengthen the queue 
inside the pool.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-27669) [JUnit5 Migration] Module: flink-file-sink-common

2022-05-17 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27669?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin updated FLINK-27669:

Component/s: Tests

> [JUnit5 Migration] Module: flink-file-sink-common
> -
>
> Key: FLINK-27669
> URL: https://issues.apache.org/jira/browse/FLINK-27669
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (FLINK-27672) [JUnit5 Migration] Module: flink-table-common

2022-05-17 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin updated FLINK-27672:

Component/s: Table SQL / API
 Tests

> [JUnit5 Migration] Module: flink-table-common
> -
>
> Key: FLINK-27672
> URL: https://issues.apache.org/jira/browse/FLINK-27672
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API, Tests
>Reporter: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27673) [JUnit5 Migration] Module: flink-table-api-scala

2022-05-17 Thread Sergey Nuyanzin (Jira)
Sergey Nuyanzin created FLINK-27673:
---

 Summary: [JUnit5 Migration] Module: flink-table-api-scala
 Key: FLINK-27673
 URL: https://issues.apache.org/jira/browse/FLINK-27673
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API, Tests
Reporter: Sergey Nuyanzin






--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink] flinkbot commented on pull request #19753: [FLINK-27672][tests][table] Migrate flink-table-common to JUnit5

2022-05-17 Thread GitBox


flinkbot commented on PR #19753:
URL: https://github.com/apache/flink/pull/19753#issuecomment-1129347186

   
   ## CI report:
   
   * af3374e392677cc6ab2161dbd25185e0215999d5 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #19753: [FLINK-27672][tests][table] Migrate flink-table-common to JUnit5

2022-05-17 Thread GitBox


snuyanzin commented on code in PR #19753:
URL: https://github.com/apache/flink/pull/19753#discussion_r875285230


##
flink-table/flink-table-common/src/test/java/org/apache/flink/table/descriptors/DescriptorTestBase.java:
##
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors;
-
-import org.apache.flink.util.Preconditions;
-
-import org.junit.Test;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** Test base for testing {@link Descriptor} together with {@link 
DescriptorValidator}. */
-public abstract class DescriptorTestBase {

Review Comment:
   Removed as it is abstract, no successors, no usage



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-27672) [JUnit5 Migration] Module: flink-table-common

2022-05-17 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-27672:
---
Labels: pull-request-available  (was: )

> [JUnit5 Migration] Module: flink-table-common
> -
>
> Key: FLINK-27672
> URL: https://issues.apache.org/jira/browse/FLINK-27672
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink] snuyanzin opened a new pull request, #19753: [FLINK-27672][tests][table] Migrate flink-table-common to JUnit5

2022-05-17 Thread GitBox


snuyanzin opened a new pull request, #19753:
URL: https://github.com/apache/flink/pull/19753

   ## What is the purpose of the change
   
   Update the flink-table-common module to AssertJ and JUnit 5 following the 
[JUnit 5 Migration 
Guide](https://docs.google.com/document/d/1514Wa_aNB9bJUen4xm5uiuXOooOJTtXqS_Jqk9KJitU/edit)
   
   
   ## Brief change log
   
   Use JUnit5 and AssertJ in tests instead of JUnit4 and Hamcrest
   
   ## Verifying this change
   
   This change is a code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): ( no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: ( no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: ( no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? ( no)
 - If yes, how is the feature documented? (not applicable)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-27672) [JUnit5 Migration] Module: flink-table-common

2022-05-17 Thread Sergey Nuyanzin (Jira)
Sergey Nuyanzin created FLINK-27672:
---

 Summary: [JUnit5 Migration] Module: flink-table-common
 Key: FLINK-27672
 URL: https://issues.apache.org/jira/browse/FLINK-27672
 Project: Flink
  Issue Type: Sub-task
Reporter: Sergey Nuyanzin






--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-27650) First environment variable of top level pod template is lost

2022-05-17 Thread Simon Paradis (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538399#comment-17538399
 ] 

Simon Paradis commented on FLINK-27650:
---

Okay then we can defined all variable topmost and override as needed.

> First environment variable of top level pod template is lost
> 
>
> Key: FLINK-27650
> URL: https://issues.apache.org/jira/browse/FLINK-27650
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-0.1.0
>Reporter: Simon Paradis
>Priority: Major
> Fix For: kubernetes-operator-1.0.0
>
> Attachments: flink-27650.yaml
>
>
> I am using the Flink operator image *apache/flink-kubernetes-operator:0.1.0* 
> to deploy Flink 1.14.4 job. The deployment manifest makes use of pod template 
> feature to inject environment variable to control structured JSON logging.
> I noticed the first defined environment variable is never injected into the 
> JobManager nor TaskManager pods. The work around is to define a dummy env. 
> var.
> Here's the manifest template. This gets processed by a tool that will first 
> expand ${ENV_VAR} reference with values provided by our CI pipeline. We 
> should not have to create the FLINK_COORDINATES_DUMMY env var.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink-web] MartijnVisser merged pull request #539: add low latency techniques blog post part1

2022-05-17 Thread GitBox


MartijnVisser merged PR #539:
URL: https://github.com/apache/flink-web/pull/539


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] MartijnVisser merged pull request #19748: [hotfix][python] Fix lint-python typo

2022-05-17 Thread GitBox


MartijnVisser merged PR #19748:
URL: https://github.com/apache/flink/pull/19748


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-27665) Optimize event triggering on DeploymentFailedExceptions

2022-05-17 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27665?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-27665:
---
Labels: pull-request-available  (was: )

> Optimize event triggering on DeploymentFailedExceptions
> ---
>
> Key: FLINK-27665
> URL: https://issues.apache.org/jira/browse/FLINK-27665
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-0.1.0
>Reporter: Matyas Orhidi
>Assignee: Matyas Orhidi
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.0.0
>
> Attachments: image-2022-05-17-12-08-42-597.png, 
> image-2022-05-17-12-13-19-489.png
>
>
> Use `EventUtils` when handling `DeploymentFailedExceptions` to avoid 
> appending new events on every reconcile loop:
> !image-2022-05-17-12-13-19-489.png|width=1365,height=575!
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink-kubernetes-operator] morhidi commented on pull request #220: [FLINK-27665] Optimize event triggering on DeploymentFailedExceptions

2022-05-17 Thread GitBox


morhidi commented on PR #220:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/220#issuecomment-1129103487

   cc @wangyang0918 @Aitozi 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] morhidi opened a new pull request, #220: [FLINK-27665] Optimize event triggering on DeploymentFailedExceptions

2022-05-17 Thread GitBox


morhidi opened a new pull request, #220:
URL: https://github.com/apache/flink-kubernetes-operator/pull/220

   - Separated the event related logic from the DeploymentFailedExceptions
   - Introduced a new component type for JobManagerDeployment, reporting 
DeploymentFailedExceptions
   - Reduced the event types to Warning and Normal only see [API 
docs](https://github.com/kubernetes/api/blob/master/events/v1/types.go#L84)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on pull request #19750: [FLINK-27669][tests] Migrate flink-file-sink-common to JUnit5

2022-05-17 Thread GitBox


snuyanzin commented on PR #19750:
URL: https://github.com/apache/flink/pull/19750#issuecomment-1129088650

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-27671) Publish SNAPSHOT docker images

2022-05-17 Thread Jira


[ 
https://issues.apache.org/jira/browse/FLINK-27671?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538319#comment-17538319
 ] 

Márton Balassi commented on FLINK-27671:


Fyi: 
[https://github.com/apache/flink-kubernetes-operator/blob/main/.github/workflows/docker_push.yml]

Please add support to arm architecture too as the above GHA implementation does.

> Publish SNAPSHOT docker images
> --
>
> Key: FLINK-27671
> URL: https://issues.apache.org/jira/browse/FLINK-27671
> Project: Flink
>  Issue Type: Improvement
>Reporter: Alexander Fedulov
>Assignee: Alexander Fedulov
>Priority: Major
>
> A discussion on the Flink-Dev mailing list [1] concluded that it is desirable 
> to add the ability to publish SNAPSHOT Docker images of Apache Flink to GHCR. 
> [1] [https://lists.apache.org/thread/0h60qz8vrw980n1vscz3pxcsv3c3h24m]



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink] flinkbot commented on pull request #19752: [FLINK-27340][tests] Migrate module flink-python to JUnit5

2022-05-17 Thread GitBox


flinkbot commented on PR #19752:
URL: https://github.com/apache/flink/pull/19752#issuecomment-1129072997

   
   ## CI report:
   
   * c3f34db24204a4d4ae5544688139b3de3f143fc1 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-27340) [JUnit5 Migration] Module: flink-python

2022-05-17 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27340?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-27340:
---
Labels: pull-request-available  (was: )

> [JUnit5 Migration] Module: flink-python
> ---
>
> Key: FLINK-27340
> URL: https://issues.apache.org/jira/browse/FLINK-27340
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python, Tests
>Reporter: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink] snuyanzin opened a new pull request, #19752: [FLINK-27340][tests] Migrate module flink-python to JUnit5

2022-05-17 Thread GitBox


snuyanzin opened a new pull request, #19752:
URL: https://github.com/apache/flink/pull/19752

   ## What is the purpose of the change
   
   Update the flink-python module to AssertJ and JUnit 5 following the [JUnit 5 
Migration 
Guide](https://docs.google.com/document/d/1514Wa_aNB9bJUen4xm5uiuXOooOJTtXqS_Jqk9KJitU/edit)
   
   
   ## Brief change log
   
   Use JUnit5 and AssertJ in tests instead of JUnit4 and Hamcrest
   
   
   ## Verifying this change
   
   This change is a code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): ( no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: ( no)
 - The serializers: ( no)
 - The runtime per-record code paths (performance sensitive): ( no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: ( no)
 - The S3 file system connector: ( no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? ( no)
 - If yes, how is the feature documented? (not applicable)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-docker] afedulov commented on pull request #116: Publish snapshot images

2022-05-17 Thread GitBox


afedulov commented on PR #116:
URL: https://github.com/apache/flink-docker/pull/116#issuecomment-1129067361

   @zentol I cleaned up the PR and resolved conflicts with the master, PTAL.
   The main question for me is where this change should be merged. IIUC the 
issue is that the add-custom.sh script is not available on master, while 
scheduled runs can only be executed there. Would you be OK with adding 
add-custom.sh to master  in that case? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] MartijnVisser merged pull request #19749: [FLINK-27167][test] change the junit-jupiter dependency scope to compile

2022-05-17 Thread GitBox


MartijnVisser merged PR #19749:
URL: https://github.com/apache/flink/pull/19749


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-27671) Publish SNAPSHOT docker images

2022-05-17 Thread Alexander Fedulov (Jira)
Alexander Fedulov created FLINK-27671:
-

 Summary: Publish SNAPSHOT docker images
 Key: FLINK-27671
 URL: https://issues.apache.org/jira/browse/FLINK-27671
 Project: Flink
  Issue Type: Improvement
Reporter: Alexander Fedulov
Assignee: Alexander Fedulov


A discussion on the Flink-Dev mailing list [1] concluded that it is desirable 
to add the ability to publish SNAPSHOT Docker images of Apache Flink to GHCR. 


[1] [https://lists.apache.org/thread/0h60qz8vrw980n1vscz3pxcsv3c3h24m]



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #187: [FLINK-27443] Create standalone mode parameters and decorators for JM and TMs

2022-05-17 Thread GitBox


gyfora commented on code in PR #187:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/187#discussion_r874958360


##
flink-kubernetes-standalone-cluster/pom.xml:
##
@@ -0,0 +1,87 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+4.0.0
+
+
+
+flink-kubernetes-operator-parent
+org.apache.flink
+1.0-SNAPSHOT
+..
+
+
+
+
+flink-kubernetes-standalone-cluster

Review Comment:
   Maybe a more suitable name would be simply `flink-kubernetes-standalone`



##
flink-kubernetes-standalone-cluster/pom.xml:
##
@@ -0,0 +1,87 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+4.0.0
+
+
+
+flink-kubernetes-operator-parent
+org.apache.flink
+1.0-SNAPSHOT
+..
+
+
+
+
+flink-kubernetes-standalone-cluster
+Flink Kubernetes Standalone Kube Client

Review Comment:
   I think we can simply name this `Flink Kubernetes Standalone`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] echauchot commented on a diff in pull request #19680: [FLINK-27457] Implement flush() logic in Cassandra output formats

2022-05-17 Thread GitBox


echauchot commented on code in PR #19680:
URL: https://github.com/apache/flink/pull/19680#discussion_r874964198


##
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormatBase.java:
##
@@ -17,130 +17,191 @@
 
 package org.apache.flink.batch.connectors.cassandra;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.io.RichOutputFormat;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connectors.cassandra.utils.SinkUtils;
 import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
 import org.apache.flink.util.Preconditions;
 
 import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.ResultSetFuture;
 import com.datastax.driver.core.Session;
-import com.google.common.base.Strings;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.time.Duration;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
- * CassandraOutputFormatBase is the common abstract class for writing into 
Apache Cassandra.
+ * CassandraOutputFormatBase is the common abstract class for writing into 
Apache Cassandra using
+ * output formats.
  *
  * @param  Type of the elements to write.
  */
-public abstract class CassandraOutputFormatBase extends 
RichOutputFormat {
+public abstract class CassandraOutputFormatBase extends 
RichOutputFormat {
 private static final Logger LOG = 
LoggerFactory.getLogger(CassandraOutputFormatBase.class);
 
-private final String insertQuery;
 private final ClusterBuilder builder;
+private Semaphore semaphore;
+private Duration maxConcurrentRequestsTimeout = 
Duration.ofMillis(Long.MAX_VALUE);
+private int maxConcurrentRequests = Integer.MAX_VALUE;
 
 private transient Cluster cluster;
-private transient Session session;
-private transient PreparedStatement prepared;
-private transient FutureCallback callback;
-private transient Throwable exception = null;
+protected transient Session session;
+private transient FutureCallback callback;
+private AtomicReference throwable;
 
-public CassandraOutputFormatBase(String insertQuery, ClusterBuilder 
builder) {
-Preconditions.checkArgument(
-!Strings.isNullOrEmpty(insertQuery), "Query cannot be null or 
empty");
+public CassandraOutputFormatBase(ClusterBuilder builder) {
 Preconditions.checkNotNull(builder, "Builder cannot be null");
-
-this.insertQuery = insertQuery;
 this.builder = builder;
 }
 
+/**
+ * Sets the maximum allowed number of concurrent requests for this output 
format.
+ *
+ * @param maxConcurrentRequestsTimeout timeout duration when acquiring a 
permit to execute
+ */
+public void setMaxConcurrentRequestsTimeout(Duration 
maxConcurrentRequestsTimeout) {

Review Comment:
   :+1: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-docker] afedulov commented on a diff in pull request #116: Publish snapshot images

2022-05-17 Thread GitBox


afedulov commented on code in PR #116:
URL: https://github.com/apache/flink-docker/pull/116#discussion_r866937358


##
.github/workflows/snapshot.yml:
##
@@ -0,0 +1,68 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name: "Publish SNAPSHOTs"
+
+on:
+  # schedule:
+  #   - cron: '0 0 * * *' # Deploy every day
+  push:
+branches:
+  - snapshot-dev-master
+
+env:
+  REGISTRY: ghcr.io
+  VERSION: 1.16-SNAPSHOT
+
+jobs:
+  ci:
+uses: ./.github/workflows/ci.yml
+  snapshot:
+needs: ci
+runs-on: ubuntu-latest
+permissions:
+  contents: read
+  packages: write
+strategy:
+  matrix:
+config:
+  - java_version: 8
+  - java_version: 11
+steps:
+  - name: Set env
+run: |
+  repo=${{ github.repository }}# 
apache/flink-docker
+  echo "IMAGE_REPO=$(echo ${repo##*/})" >> $GITHUB_ENV # apache
+  echo "OWNER=$(echo ${repo%%/*})" >> $GITHUB_ENV  # flink-docker
+  - uses: actions/checkout@v3
+  - name: Variables
+run: | 
+  echo "OWNER: $OWNER"
+  echo "IMAGE_REPO: $IMAGE_REPO"
+  - name: Prepare Dockerfiles
+run: |
+  ./add-custom.sh -u 
"https://s3.amazonaws.com/flink-nightly/flink-${VERSION}-bin-scala_2.12.tgz; -j 
${{ matrix.config.java_version }} -n ${VERSION}-scala_2.12-java${{ 
matrix.config.java_version }}

Review Comment:
   I thought that we were going to put the snapshot.yaml to the dev-master 
branch. If I get you correctly, this won't work with the scheduled runs, 
because they can only be triggered from master. Is my understanding correct?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] fredia commented on a diff in pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

2022-05-17 Thread GitBox


fredia commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r874963782


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java:
##
@@ -47,23 +50,33 @@ public class SharedStateRegistryImpl implements 
SharedStateRegistry {
 /** All registered state objects by an artificial key */
 private final Map 
registeredStates;
 
+/** All registered checkpoints */
+private final Set registeredCheckpoints;

Review Comment:
   I replace `Set` with `Map` 
temporarily. 
   
   > Do you think it could/should be refactored so that 
CompletedCheckpointStore "transfers the ownership" of subsumed checkpoints to 
SharedStateRegistry?
   > E.g. by passing them to unregisterUnusedState.
   
   I think it would work well, I would try to pass 
`DefaultCompletedCheckpointStore.completedCheckpoints` to unregisterUnusedState 
later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] fredia commented on a diff in pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

2022-05-17 Thread GitBox


fredia commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r874957754


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java:
##
@@ -227,6 +229,14 @@ private static void registerSharedState(
 for (KeyedStateHandle stateHandle : stateHandles) {
 if (stateHandle != null) {
 stateHandle.registerSharedStates(sharedStateRegistry, 
checkpointID);
+// Registering state handle to the given sharedStateRegistry 
serves two purposes:
+// 1. let sharedStateRegistry be responsible for cleaning the 
state handle,
+// 2. update the status of the checkpoint in 
sharedStateRegistry to which the state
+// handle belongs
+sharedStateRegistry.registerReference(
+new 
SharedStateRegistryKey(stateHandle.getStateHandleId().getKeyString()),
+new StreamStateHandleWrapper(stateHandle),

Review Comment:
   > can we get rid of the same call in ChangelogStateBackendHandle?
   
   We can't, the key of changelog handle and the key of the KeyedStateHandle in 
changelog handle is not same.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] fredia commented on a diff in pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

2022-05-17 Thread GitBox


fredia commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r874950094


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java:
##
@@ -137,23 +134,19 @@ public CompletedCheckpoint 
addCheckpointAndSubsumeOldestOne(
 
 completedCheckpoints.addLast(checkpoint);
 
+// Register checkpoint to SharedStateRegistry, and checkpoint would be 
discarded during
+// unregistering.
+getSharedStateRegistry().registerCompletedCheckpoint(checkpoint);
+
+// Remove completed checkpoint from queue and 
checkpointStateHandleStore, not discard.
 Optional subsume =
 CheckpointSubsumeHelper.subsume(
 completedCheckpoints,
 maxNumberOfCheckpointsToRetain,
-completedCheckpoint ->
-tryRemoveCompletedCheckpoint(
-completedCheckpoint,
-
completedCheckpoint.shouldBeDiscardedOnSubsume(),
-checkpointsCleaner,
-postCleanup));
+completedCheckpoint -> 
tryRemove(completedCheckpoint.getCheckpointID()));

Review Comment:
   Yes, it is a duplicate call to cleanup checkpoint.  I pass an empty subsume 
action to `unregisterUnusedState` to avoid duplicate cleanup.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] fredia commented on a diff in pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

2022-05-17 Thread GitBox


fredia commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r874946254


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java:
##
@@ -137,12 +150,26 @@ public StreamStateHandle registerReference(
 return entry.stateHandle;
 }
 
+@Override
+public void registerCompletedCheckpoint(CompletedCheckpoint checkpoint) {
+synchronized (registeredCheckpoints) {
+LOG.trace("Register checkpoint {}.", checkpoint.getCheckpointID());
+registeredCheckpoints.add(checkpoint);
+}
+}
+
+@Override
+public void setPostCleanAction(Runnable postCleanAction) {
+this.postCleanAction = postCleanAction;

Review Comment:
   Nice suggestion. I pass the `subsumAction` as an argument to 
unregisterUnusedState, and subsume action contains `postCleanAction`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-27670) Python wrappers for Kinesis Sinks

2022-05-17 Thread Zichen Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538256#comment-17538256
 ] 

Zichen Liu commented on FLINK-27670:


This task is the Sink portion of the parent task FLINK-21966

> Python wrappers for Kinesis Sinks
> -
>
> Key: FLINK-27670
> URL: https://issues.apache.org/jira/browse/FLINK-27670
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kinesis
>Reporter: Zichen Liu
>Priority: Major
> Fix For: 1.15.1
>
>
> Create Python Wrappers for the new Kinesis Streams sink and the Kinesis 
> Firehose sink.
> An example implementation may be found here 
> [https://github.com/apache/flink/pull/15491/files] for the old Kinesis sink.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink] afedulov commented on pull request #19660: [FLINK-27185][connectors] Convert connector modules to assertj

2022-05-17 Thread GitBox


afedulov commented on PR #19660:
URL: https://github.com/apache/flink/pull/19660#issuecomment-1128983193

   connector-hive module looks good apart from the potential here 
https://github.com/apache/flink/pull/19660#discussion_r874931314. Can be merged 
when addressed. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] fredia commented on a diff in pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

2022-05-17 Thread GitBox


fredia commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r874932077


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java:
##
@@ -227,6 +229,14 @@ private static void registerSharedState(
 for (KeyedStateHandle stateHandle : stateHandles) {
 if (stateHandle != null) {
 stateHandle.registerSharedStates(sharedStateRegistry, 
checkpointID);
+// Registering state handle to the given sharedStateRegistry 
serves two purposes:
+// 1. let sharedStateRegistry be responsible for cleaning the 
state handle,

Review Comment:
   Nice suggestion!  I think that ID is also unnecessary, because 
SharedStateRegistryKey contains state handle id.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] afedulov commented on a diff in pull request #19660: [FLINK-27185][connectors] Convert connector modules to assertj

2022-05-17 Thread GitBox


afedulov commented on code in PR #19660:
URL: https://github.com/apache/flink/pull/19660#discussion_r874931314


##
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java:
##
@@ -437,12 +431,12 @@ public void testCustomPartitionCommitPolicyNotFound() {
 
testStreamingWriteWithCustomPartitionCommitPolicy(customCommitPolicyClassName);
 fail("ExecutionException expected");
 } catch (Exception e) {
-assertTrue(
-ExceptionUtils.findThrowableWithMessage(
+assertThat(

Review Comment:
   We could use `hasStackTraceContaining` in the following check to avoid the 
complex `satisfies` logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-24433) "No space left on device" in Azure e2e tests

2022-05-17 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser updated FLINK-24433:
---
Fix Version/s: (was: 1.14.5)
   (was: 1.15.1)

> "No space left on device" in Azure e2e tests
> 
>
> Key: FLINK-24433
> URL: https://issues.apache.org/jira/browse/FLINK-24433
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / Azure Pipelines
>Affects Versions: 1.15.0
>Reporter: Dawid Wysakowicz
>Assignee: Martijn Visser
>Priority: Blocker
>  Labels: auto-deprioritized-critical, pull-request-available, 
> test-stability
> Fix For: 1.16.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=24668=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=070ff179-953e-5bda-71fa-d6599415701c=19772
> {code}
> Sep 30 17:08:42 Job has been submitted with JobID 
> 5594c18e128a328ede39cfa59cb3cb07
> Sep 30 17:08:56 2021-09-30 17:08:56,809 main ERROR Recovering from 
> StringBuilderEncoder.encode('2021-09-30 17:08:56,807 WARN  
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher [] - An 
> exception occurred when fetching query results
> Sep 30 17:08:56 java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.rest.util.RestClientException: [Internal server 
> error.,  Sep 30 17:08:56 org.apache.flink.runtime.messages.FlinkJobNotFoundException: 
> Could not find Flink job (5594c18e128a328ede39cfa59cb3cb07)
> Sep 30 17:08:56   at 
> org.apache.flink.runtime.dispatcher.Dispatcher.getJobMasterGateway(Dispatcher.java:923)
> Sep 30 17:08:56   at 
> org.apache.flink.runtime.dispatcher.Dispatcher.performOperationOnJobMasterGateway(Dispatcher.java:937)
> Sep 30 17:08:56   at 
> org.apache.flink.runtime.dispatcher.Dispatcher.deliverCoordinationRequestToCoordina2021-09-30T17:08:57.1584224Z
>  ##[error]No space left on device
> {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Closed] (FLINK-24433) "No space left on device" in Azure e2e tests

2022-05-17 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser closed FLINK-24433.
--
Resolution: Fixed

> "No space left on device" in Azure e2e tests
> 
>
> Key: FLINK-24433
> URL: https://issues.apache.org/jira/browse/FLINK-24433
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / Azure Pipelines
>Affects Versions: 1.15.0, 1.16.0
>Reporter: Dawid Wysakowicz
>Assignee: Martijn Visser
>Priority: Blocker
>  Labels: auto-deprioritized-critical, pull-request-available, 
> test-stability
> Fix For: 1.16.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=24668=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=070ff179-953e-5bda-71fa-d6599415701c=19772
> {code}
> Sep 30 17:08:42 Job has been submitted with JobID 
> 5594c18e128a328ede39cfa59cb3cb07
> Sep 30 17:08:56 2021-09-30 17:08:56,809 main ERROR Recovering from 
> StringBuilderEncoder.encode('2021-09-30 17:08:56,807 WARN  
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher [] - An 
> exception occurred when fetching query results
> Sep 30 17:08:56 java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.rest.util.RestClientException: [Internal server 
> error.,  Sep 30 17:08:56 org.apache.flink.runtime.messages.FlinkJobNotFoundException: 
> Could not find Flink job (5594c18e128a328ede39cfa59cb3cb07)
> Sep 30 17:08:56   at 
> org.apache.flink.runtime.dispatcher.Dispatcher.getJobMasterGateway(Dispatcher.java:923)
> Sep 30 17:08:56   at 
> org.apache.flink.runtime.dispatcher.Dispatcher.performOperationOnJobMasterGateway(Dispatcher.java:937)
> Sep 30 17:08:56   at 
> org.apache.flink.runtime.dispatcher.Dispatcher.deliverCoordinationRequestToCoordina2021-09-30T17:08:57.1584224Z
>  ##[error]No space left on device
> {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (FLINK-24433) "No space left on device" in Azure e2e tests

2022-05-17 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser updated FLINK-24433:
---
Affects Version/s: 1.16.0

> "No space left on device" in Azure e2e tests
> 
>
> Key: FLINK-24433
> URL: https://issues.apache.org/jira/browse/FLINK-24433
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / Azure Pipelines
>Affects Versions: 1.15.0, 1.16.0
>Reporter: Dawid Wysakowicz
>Assignee: Martijn Visser
>Priority: Blocker
>  Labels: auto-deprioritized-critical, pull-request-available, 
> test-stability
> Fix For: 1.16.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=24668=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=070ff179-953e-5bda-71fa-d6599415701c=19772
> {code}
> Sep 30 17:08:42 Job has been submitted with JobID 
> 5594c18e128a328ede39cfa59cb3cb07
> Sep 30 17:08:56 2021-09-30 17:08:56,809 main ERROR Recovering from 
> StringBuilderEncoder.encode('2021-09-30 17:08:56,807 WARN  
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher [] - An 
> exception occurred when fetching query results
> Sep 30 17:08:56 java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.rest.util.RestClientException: [Internal server 
> error.,  Sep 30 17:08:56 org.apache.flink.runtime.messages.FlinkJobNotFoundException: 
> Could not find Flink job (5594c18e128a328ede39cfa59cb3cb07)
> Sep 30 17:08:56   at 
> org.apache.flink.runtime.dispatcher.Dispatcher.getJobMasterGateway(Dispatcher.java:923)
> Sep 30 17:08:56   at 
> org.apache.flink.runtime.dispatcher.Dispatcher.performOperationOnJobMasterGateway(Dispatcher.java:937)
> Sep 30 17:08:56   at 
> org.apache.flink.runtime.dispatcher.Dispatcher.deliverCoordinationRequestToCoordina2021-09-30T17:08:57.1584224Z
>  ##[error]No space left on device
> {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-24433) "No space left on device" in Azure e2e tests

2022-05-17 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24433?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538252#comment-17538252
 ] 

Martijn Visser commented on FLINK-24433:


After more investigating, we found out that an Elasticsearch cluster goes into 
read only mode by default if there's 85% or less disk space available. On the 
Azure E2E tests, this is quite common. We therefore disable the entire disk 
allocation decider, as documented on 
https://www.elastic.co/guide/en/elasticsearch/reference/6.2/disk-allocator.html

Fixed in master: 86cf5247c564a6a17a41c0a858e838ca975c9a22


> "No space left on device" in Azure e2e tests
> 
>
> Key: FLINK-24433
> URL: https://issues.apache.org/jira/browse/FLINK-24433
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / Azure Pipelines
>Affects Versions: 1.15.0
>Reporter: Dawid Wysakowicz
>Assignee: Martijn Visser
>Priority: Blocker
>  Labels: auto-deprioritized-critical, pull-request-available, 
> test-stability
> Fix For: 1.16.0, 1.14.5, 1.15.1
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=24668=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=070ff179-953e-5bda-71fa-d6599415701c=19772
> {code}
> Sep 30 17:08:42 Job has been submitted with JobID 
> 5594c18e128a328ede39cfa59cb3cb07
> Sep 30 17:08:56 2021-09-30 17:08:56,809 main ERROR Recovering from 
> StringBuilderEncoder.encode('2021-09-30 17:08:56,807 WARN  
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher [] - An 
> exception occurred when fetching query results
> Sep 30 17:08:56 java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.rest.util.RestClientException: [Internal server 
> error.,  Sep 30 17:08:56 org.apache.flink.runtime.messages.FlinkJobNotFoundException: 
> Could not find Flink job (5594c18e128a328ede39cfa59cb3cb07)
> Sep 30 17:08:56   at 
> org.apache.flink.runtime.dispatcher.Dispatcher.getJobMasterGateway(Dispatcher.java:923)
> Sep 30 17:08:56   at 
> org.apache.flink.runtime.dispatcher.Dispatcher.performOperationOnJobMasterGateway(Dispatcher.java:937)
> Sep 30 17:08:56   at 
> org.apache.flink.runtime.dispatcher.Dispatcher.deliverCoordinationRequestToCoordina2021-09-30T17:08:57.1584224Z
>  ##[error]No space left on device
> {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink] MartijnVisser merged pull request #19751: [FLINK-24433][Connector][Elasticsearch] Disable Elasticsearch disk allocation decider

2022-05-17 Thread GitBox


MartijnVisser merged PR #19751:
URL: https://github.com/apache/flink/pull/19751


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #117: [FLINK-27543] Hide column statistics collector inside the file format writer.

2022-05-17 Thread GitBox


JingsongLi commented on code in PR #117:
URL: https://github.com/apache/flink-table-store/pull/117#discussion_r874898190


##
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/MetricFileWriter.java:
##
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.table.store.file.writer;
+
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.file.stats.FieldStatsCollector;
+import org.apache.flink.table.store.file.stats.FileStatsExtractor;
+import org.apache.flink.table.store.file.utils.FileUtils;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.function.Function;
+
+/**
+ * An {@link FileWriter} to write generic record and generate {@link Metric} 
once closing it.
+ *
+ * @param  generic record type.
+ */
+public class MetricFileWriter implements FileWriter {
+private static final Logger LOG = 
LoggerFactory.getLogger(MetricFileWriter.class);
+
+private final BulkWriter writer;
+private final Function converter;
+private final FSDataOutputStream out;
+private final Path path;
+private final FileStatsExtractor fileStatsExtractor;
+
+private FieldStatsCollector fieldStatsCollector = null;
+
+private long recordCount;
+private boolean closed = false;
+
+private MetricFileWriter(
+BulkWriter writer,
+Function converter,
+FSDataOutputStream out,
+Path path,
+RowType writeSchema,
+FileStatsExtractor fileStatsExtractor) {
+this.writer = writer;
+this.converter = converter;
+this.out = out;
+this.path = path;
+
+this.fileStatsExtractor = fileStatsExtractor;
+if (this.fileStatsExtractor == null) {
+this.fieldStatsCollector = new FieldStatsCollector(writeSchema);
+}
+
+this.recordCount = 0L;
+}
+
+@Override
+public void write(T record) throws IOException {
+RowData rowData = converter.apply(record);
+writer.addElement(rowData);
+
+if (fieldStatsCollector != null) {
+fieldStatsCollector.collect(rowData);
+}
+
+recordCount += 1;
+}
+
+@Override
+public long recordCount() {
+return recordCount;
+}
+
+@Override
+public long length() {
+try {
+return out.getPos();
+} catch (IOException e) {
+throw new UncheckedIOException(e);
+}
+}
+
+@Override
+public void abort() {
+try {
+close();
+} catch (IOException e) {
+throw new UncheckedIOException(e);
+}
+
+FileUtils.deleteOrWarn(path);
+}
+
+@Override
+public Metric result() throws IOException {
+Preconditions.checkState(closed, "Cannot access metric unless the 
writer is closed.");
+
+FieldStats[] stats;
+if (fileStatsExtractor != null) {
+stats = fileStatsExtractor.extract(path);
+} else {
+stats = fieldStatsCollector.extractFieldStats();
+}
+
+return new Metric(stats, recordCount);
+}
+
+@Override
+public void close() throws IOException {
+if (!closed) {
+if (writer != null) {
+writer.flush();
+writer.finish();
+}
+
+if (out != null) {
+out.flush();
+out.close();
+}
+
+closed = true;
+}
+}
+
+public static  FileWriter.Factory createFactory(
+BulkWriter.Factory factory,
+Function converter,
+RowType writeSchema,
+FileStatsExtractor 

[jira] [Created] (FLINK-27670) Python wrappers for Kinesis Sinks

2022-05-17 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-27670:
--

 Summary: Python wrappers for Kinesis Sinks
 Key: FLINK-27670
 URL: https://issues.apache.org/jira/browse/FLINK-27670
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kinesis
Reporter: Zichen Liu
 Fix For: 1.15.1


Create Python Wrappers for the new Kinesis Streams sink and the Kinesis 
Firehose sink.

An example implementation may be found here 
[https://github.com/apache/flink/pull/15491/files] for the old Kinesis sink.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-27640) Flink not compiling, flink-connector-hive_2.12 is missing jhyde pentaho-aggdesigner-algorithm:jar:5.1.5-jhyde

2022-05-17 Thread Chesnay Schepler (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27640?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538236#comment-17538236
 ] 

Chesnay Schepler commented on FLINK-27640:
--

FYI this is because you are using a newer maven version than recommended, which 
blocks http repositories.

The conjars repo is defined in the hive poms. We could override those, but it's 
not really ideal because this is only required for hive 2.X, and not 3.X that 
we also support.



> Flink not compiling, flink-connector-hive_2.12 is missing jhyde 
> pentaho-aggdesigner-algorithm:jar:5.1.5-jhyde 
> --
>
> Key: FLINK-27640
> URL: https://issues.apache.org/jira/browse/FLINK-27640
> Project: Flink
>  Issue Type: Bug
>  Components: Build System, Connectors / Hive
>Affects Versions: 1.16.0
>Reporter: Piotr Nowojski
>Assignee: Chesnay Schepler
>Priority: Critical
>
> When clean installing whole project after cleaning local {{.m2}} directory I 
> encountered the following error when compiling flink-connector-hive_2.12:
> {noformat}
> [ERROR] Failed to execute goal on project flink-connector-hive_2.12: Could 
> not resolve dependencies for project 
> org.apache.flink:flink-connector-hive_2.12:jar:1.16-SNAPSHOT: Failed to 
> collect dependencies at org.apache.hive:hive-exec:jar:2.3.9 -> 
> org.pentaho:pentaho-aggdesigner-algorithm:jar:5.1.5-jhyde: Failed to read 
> artifact descriptor for 
> org.pentaho:pentaho-aggdesigner-algorithm:jar:5.1.5-jhyde: Could not transfer 
> artifact org.pentaho:pentaho-aggdesigner-algorithm:pom:5.1.5-jhyde from/to 
> maven-default-http-blocker (http://0.0.0.0/): Blocked mirror for 
> repositories: [conjars (http://conjars.org/repo, default, 
> releases+snapshots), apache.snapshots 
> (http://repository.apache.org/snapshots, default, snapshots)] -> [Help 1]
> {noformat}
> I've solved this by adding 
> {noformat}
> 
> spring-repo-plugins
> https://repo.spring.io/ui/native/plugins-release/
> 
> {noformat}
> to ~/.m2/settings.xml file. 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink] flinkbot commented on pull request #19751: [FLINK-24433][Connector][Elasticsearch] Disable Elasticsearch disk allocation decider

2022-05-17 Thread GitBox


flinkbot commented on PR #19751:
URL: https://github.com/apache/flink/pull/19751#issuecomment-1128945587

   
   ## CI report:
   
   * cbbdef5ae62531f244db5b841b38bd781ac1adff UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] MartijnVisser commented on pull request #19751: [FLINK-24433][Connector][Elasticsearch] Disable Elasticsearch disk allocation decider

2022-05-17 Thread GitBox


MartijnVisser commented on PR #19751:
URL: https://github.com/apache/flink/pull/19751#issuecomment-1128939651

   Successful pipeline run can be found in 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=35747=results


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   3   4   >