[GitHub] [flink] zhuzhurk commented on a diff in pull request #19747: [FLINK-17295][runtime] Refactor the ExecutionAttemptID to consist of ExecutionGraphID, ExecutionVertexID and attemptNumber
zhuzhurk commented on code in PR #19747: URL: https://github.com/apache/flink/pull/19747#discussion_r875493129 ## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAttemptID.java: ## @@ -68,19 +117,32 @@ public boolean equals(Object obj) { return true; } else if (obj != null && obj.getClass() == getClass()) { ExecutionAttemptID that = (ExecutionAttemptID) obj; -return that.executionAttemptId.equals(this.executionAttemptId); +return that.executionGraphId.equals(this.executionGraphId) +&& that.executionVertexId.equals(this.executionVertexId) +&& that.attemptNumber == this.attemptNumber; } else { return false; } } @Override public int hashCode() { -return executionAttemptId.hashCode(); +return Objects.hash(executionGraphId, executionVertexId, attemptNumber); } @Override public String toString() { -return executionAttemptId.toString(); +return String.format( +"%s_%s_%d", executionGraphId.toString(), executionVertexId, attemptNumber); +} + +public String getLogString() { +if (DefaultExecutionGraph.LOG.isDebugEnabled()) { +return toString(); +} else { +return String.format( +"%s_%s_%d", +executionGraphId.toString().substring(0, 4), executionVertexId, attemptNumber); Review Comment: > logging the EG ID is fine, but this can't be done in isolation. There must be a way to correlate this id with a specific job submission; for example we could log the EG ID when the EG for a given job was created. Totally agree. Will add a log when creating the EG. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-17295) Refactor the ExecutionAttemptID to consist of ExecutionVertexID and attemptNumber
[ https://issues.apache.org/jira/browse/FLINK-17295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538581#comment-17538581 ] Zhu Zhu commented on FLINK-17295: - The benchmark value is the size of a serialized TDD. The difference between the 2 proposal is, each execution of #1 has a unique AbstractID, while all executions of #2 share one same ExecutionGraphID. Therefore, given *P* is the parallelism, a TDD of #1 contains *P* unique {*}AbstractID{*}s(each 16 bytes), while a TDD of #2 contains *P* {*}references{*}(each 4 bytes) to the ExecutionGraphID. > Refactor the ExecutionAttemptID to consist of ExecutionVertexID and > attemptNumber > - > > Key: FLINK-17295 > URL: https://issues.apache.org/jira/browse/FLINK-17295 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Yangze Guo >Assignee: Zhu Zhu >Priority: Major > Labels: pull-request-available > > Make the ExecutionAttemptID being composed of (ExecutionVertexID, > attemptNumber). -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Assigned] (FLINK-27675) Improve manual savepoint tracking
[ https://issues.apache.org/jira/browse/FLINK-27675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora reassigned FLINK-27675: -- Assignee: Matyas Orhidi > Improve manual savepoint tracking > - > > Key: FLINK-27675 > URL: https://issues.apache.org/jira/browse/FLINK-27675 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Reporter: Gyula Fora >Assignee: Matyas Orhidi >Priority: Blocker > Fix For: kubernetes-operator-1.0.0 > > > There are 2 problems with the manual savpeoint result observing logic that > can cause the reconciler to not make progress with the deployment > (recoveries, upgrades etc). > # Whenever the jobmanager deployment is not in READY state or the job itself > is not RUNNING, the trigger info must be reset and we should not try to query > it anymore. Flink will not retry the savepoint if the job fails, restarted > anyways. > # If there is a sensible error when fetching the savepoint status (such as: > There is no savepoint operation with triggerId=xxx for job ) we should simply > reset the trigger. These errors will never go away on their own and will > simply cause the deployment to get stuck in observing/waiting for a savepoint > to complete -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Comment Edited] (FLINK-27257) Flink kubernetes operator triggers savepoint failed because of not all tasks running
[ https://issues.apache.org/jira/browse/FLINK-27257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538539#comment-17538539 ] Nicholas Jiang edited comment on FLINK-27257 at 5/18/22 5:21 AM: - I have offline discussed with [~wangyang0918]. It's hard to improve the isJobRunning implementation because the returned JobStatusMessages of ClusterClient#listJobs() don't contain the tasksPerState of the JobDetail, which cause that there is no way to judge whether all the ExecutionState of tasks are running in the job and for batch tasks how to judge whether the job is really running. The current idea is that if the error is found to be not all required tasks are currently running, then continue to trigger savepoint in the next reconciliation until it is successfully triggered. [~gyfora], [~wangyang0918] WDYT? was (Author: nicholasjiang): I have offline discussed with [~wangyang0918]. It's hard to improve the isJobRunning implementation because the returned JobStatusMessages of ClusterClient#listJobs() don't contain the tasksPerState of the JobDetail, which cause that there is no way to judge whether all the ExecutionState of tasks are running in the job and for batch tasks how to judge whether the job is really running. The current idea is that if the error is found to be not all required tasks are currently running, then continue to trigger savepoint in the next reconciliation until it is successfully triggered. [~gyfora][~wangyang0918] WDYT? > Flink kubernetes operator triggers savepoint failed because of not all tasks > running > > > Key: FLINK-27257 > URL: https://issues.apache.org/jira/browse/FLINK-27257 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Reporter: Yang Wang >Assignee: Nicholas Jiang >Priority: Major > Fix For: kubernetes-operator-1.1.0 > > > {code:java} > 2022-04-15 02:38:56,551 o.a.f.k.o.s.FlinkService [INFO > ][default/flink-example-statemachine] Fetching savepoint result with > triggerId: 182d7f176496856d7b33fe2f3767da18 > 2022-04-15 02:38:56,690 o.a.f.k.o.s.FlinkService > [ERROR][default/flink-example-statemachine] Savepoint error > org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint > triggering task Source: Custom Source (1/2) of job > is not being executed at the moment. > Aborting checkpoint. Failure reason: Not all required tasks are currently > running. > at > org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.checkTasksStarted(DefaultCheckpointPlanCalculator.java:143) > at > org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.lambda$calculateCheckpointPlan$1(DefaultCheckpointPlanCalculator.java:105) > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:455) > at > org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:455) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) > at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) > at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) > at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) > at akka.actor.Actor.aroundReceive(Actor.scala:537) > at akka.actor.Actor.aroundReceive$(Actor.scala:535) > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) > at akka.actor.ActorCell.invoke(ActorCell.scala:548) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) > at akka.dispatch.Mailbox.run(Mailbox.scala:231) > at akka.dispatch.Mailbox.exec(Mailbox.scala:243) > at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) > at > java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) > at
[jira] [Created] (FLINK-27675) Improve manual savepoint tracking
Gyula Fora created FLINK-27675: -- Summary: Improve manual savepoint tracking Key: FLINK-27675 URL: https://issues.apache.org/jira/browse/FLINK-27675 Project: Flink Issue Type: Improvement Components: Kubernetes Operator Reporter: Gyula Fora Fix For: kubernetes-operator-1.0.0 There are 2 problems with the manual savpeoint result observing logic that can cause the reconciler to not make progress with the deployment (recoveries, upgrades etc). # Whenever the jobmanager deployment is not in READY state or the job itself is not RUNNING, the trigger info must be reset and we should not try to query it anymore. Flink will not retry the savepoint if the job fails, restarted anyways. # If there is a sensible error when fetching the savepoint status (such as: There is no savepoint operation with triggerId=xxx for job ) we should simply reset the trigger. These errors will never go away on their own and will simply cause the deployment to get stuck in observing/waiting for a savepoint to complete -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] cun8cun8 closed pull request #19740: [FLINK-26480]Support window_all in Python DataStream API
cun8cun8 closed pull request #19740: [FLINK-26480]Support window_all in Python DataStream API URL: https://github.com/apache/flink/pull/19740 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] cun8cun8 closed pull request #19755: [FLINK-26480]Support window_all in Python DataStream API
cun8cun8 closed pull request #19755: [FLINK-26480]Support window_all in Python DataStream API URL: https://github.com/apache/flink/pull/19755 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] openinx commented on a diff in pull request #117: [FLINK-27543] Hide column statistics collector inside the file format writer.
openinx commented on code in PR #117: URL: https://github.com/apache/flink-table-store/pull/117#discussion_r875460374 ## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileWriter.java: ## @@ -237,7 +210,14 @@ public RollingKvWriter(Supplier writerFactory, long targetFileSize private Supplier createWriterFactory(int level) { return () -> { try { -return new KvFileWriter(new KvBulkWriterFactory(), pathFactory.newPath(), level); +FileWriter.Factory factory = Review Comment: I decided to revert this change ( I mean putting the factory initialization into the constructor) , because it break the unit test ( I think the cause is the reuse of `KeyValueSerializer`). After applying this changes : https://github.com/apache/flink-table-store/pull/117/commits/0dd583045be08cc19990c4c98370bab1aad3ddde, the [unit test ](https://github.com/apache/flink-table-store/pull/117#issuecomment-1129526584) works fine. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-27666) Cover manual savepoint triggering in E2E tests
[ https://issues.apache.org/jira/browse/FLINK-27666?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora reassigned FLINK-27666: -- Assignee: Aitozi > Cover manual savepoint triggering in E2E tests > -- > > Key: FLINK-27666 > URL: https://issues.apache.org/jira/browse/FLINK-27666 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Reporter: Gyula Fora >Assignee: Aitozi >Priority: Major > > We should extend our e2e tests so that they cover manual savepoint triggering > using the savepointTriggerNonce. > We should verify that savepoint was triggered, recorded in the status and > trigger information cleared afterwards. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (FLINK-27643) Document new deployment lifecycle features for the operator
[ https://issues.apache.org/jira/browse/FLINK-27643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-27643: --- Labels: pull-request-available (was: ) > Document new deployment lifecycle features for the operator > --- > > Key: FLINK-27643 > URL: https://issues.apache.org/jira/browse/FLINK-27643 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Reporter: Gyula Fora >Assignee: Gyula Fora >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.0.0 > > > We should document the changes and new features to the core lifecycle > management logic, including: > * JM Deployment Recovery > * Rollbacks > * Any changed upgrade behavior > * > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #219: [FLINK-27643] Add new features to lifecycle management docs
Aitozi commented on code in PR #219: URL: https://github.com/apache/flink-kubernetes-operator/pull/219#discussion_r875455510 ## docs/content/docs/custom-resource/job-management.md: ## @@ -65,33 +66,38 @@ kubectl delete flinkdeployment my-deployment ``` {{< hint danger >}} -Deleting a deployment will remove all checkpoint and status information. Future deployments will from an empty state unless manually overriden by the user. +Deleting a deployment will remove all checkpoint and status information. Future deployments will from an empty state unless manually overridden by the user. {{< /hint >}} ## Stateful and stateless application upgrades -When the spec changes for a FlinkDeployment the running Application or Session cluster must be upgraded. +When the spec changes for FlinkDeployment and FlinkSessionJob resources, the running application must be upgraded. In order to do this the operator will stop the currently running job (unless already suspended) and redeploy it using the latest spec and state carried over from the previous run for stateful applications. Users have full control on how state should be managed when stopping and restoring stateful applications using the `upgradeMode` setting of the JobSpec. -Supported values:`stateless`, `savepoint`, `last-state` +Supported values: `stateless`, `savepoint`, `last-state` The `upgradeMode` setting controls both the stop and restore mechanisms as detailed in the following table: -| | Stateless | Last State | Savepoint | -| | -- | | | -| Config Requirement | None | Checkpointing & Kubernetes HA Enabled | Savepoint directory defined | -| Job Status Requirement | None* | None* | Job Running | -| Suspend Mechanism | Cancel / Delete | Delete Flink deployment (keep HA metadata) | Cancel with savepoint | -| Restore Mechanism | Deploy from empty state | Recover last state using HA metadata | Restore From savepoint | +|| Stateless | Last State | Savepoint | +||-||| +| Config Requirement | None| Checkpointing & Kubernetes HA Enabled | Checkpoint/Savepoint directory defined | +| Job Status Requirement | None| HA metadata available | Job Running* | +| Suspend Mechanism | Cancel / Delete | Delete Flink deployment (keep HA metadata) | Cancel with savepoint | +| Restore Mechanism | Deploy from empty state | Recover last state using HA metadata | Restore From savepoint | +| Production Use | Not recommended | Recommended | Recommended| -*\*In general no update can be executed while a savepoint operation is in progress* -The three different upgrade modes are intended to support different use-cases: - - stateless: Stateless applications, prototyping - - last-state: Suitable for most stateful production applications. Quick upgrades in any application state (even for failing jobs), does not require a healthy job. Requires Flink Kubernetes HA configuration (see example below). - - savepoint: Suitable for forking, migrating applications. Requires a healthy running job as it requires a savepoint operation before shutdown. +*\*When Kubernetes HA is enabled the `savepoint` upgrade mode may fall back to the `last-state` behaviour in cases where the job is in an unhealthy state.* + +The three upgrade modes are intended to support different scenarios: Review Comment: extra space -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-27666) Cover manual savepoint triggering in E2E tests
[ https://issues.apache.org/jira/browse/FLINK-27666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538567#comment-17538567 ] Aitozi commented on FLINK-27666: I could work on this. please assign this to me, I will try to push a PR today > Cover manual savepoint triggering in E2E tests > -- > > Key: FLINK-27666 > URL: https://issues.apache.org/jira/browse/FLINK-27666 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Reporter: Gyula Fora >Priority: Major > > We should extend our e2e tests so that they cover manual savepoint triggering > using the savepointTriggerNonce. > We should verify that savepoint was triggered, recorded in the status and > trigger information cleared afterwards. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] flinkbot commented on pull request #19755: [FLINK-26480]Support window_all in Python DataStream API
flinkbot commented on PR #19755: URL: https://github.com/apache/flink/pull/19755#issuecomment-1129540427 ## CI report: * ec83162a1f16488d6060dd977f9b5168d0e8f16f UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] cun8cun8 opened a new pull request, #19755: [FLINK-26480]Support window_all in Python DataStream API
cun8cun8 opened a new pull request, #19755: URL: https://github.com/apache/flink/pull/19755 ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-27663) upsert-kafka can't process delete message from upsert-kafka sink
[ https://issues.apache.org/jira/browse/FLINK-27663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538560#comment-17538560 ] Zhiwen Sun commented on FLINK-27663: I have changed component to {{{}Connectors/Kafka{}}}. Flink miss unit test on following method. {code:java} org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema#deserialize(org.apache.kafka.clients.consumer.ConsumerRecord, org.apache.flink.util.Collector){code} > upsert-kafka can't process delete message from upsert-kafka sink > > > Key: FLINK-27663 > URL: https://issues.apache.org/jira/browse/FLINK-27663 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.15.0, 1.13.6, 1.14.4 >Reporter: Zhiwen Sun >Priority: Major > > upsert-kafka write DELETE data as Kafka messages with null values (indicate > tombstone for the key). > But when use upsert-kafka as a source table to consumer kafka messages write > by upsert-kafka sink, DELETE messages will be ignored. > > related sql : > > > {code:java} > create table order_system_log( > id bigint, > PRIMARY KEY (id) NOT ENFORCED > ) WITH ( > 'connector' = 'upsert-kafka', > 'topic' = 'test_use', > 'properties.bootstrap.servers' = 'your broker', > 'properties.group.id' = 'your group id', > 'value.json.fail-on-missing-field' = 'false', > 'value.json.ignore-parse-errors' = 'true', > 'key.json.fail-on-missing-field' = 'false', > 'key.json.ignore-parse-errors' = 'true', > 'key.format' = 'json', > 'value.format' = 'json' > ); > select > * > from > order_system_log > ; > {code} > > > The problem may be produced by DeserializationSchema#deserialize, > this method does not collect data while subclass's deserialize return null. > > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Comment Edited] (FLINK-27663) upsert-kafka can't process delete message from upsert-kafka sink
[ https://issues.apache.org/jira/browse/FLINK-27663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538556#comment-17538556 ] Zhiwen Sun edited comment on FLINK-27663 at 5/18/22 3:42 AM: - 1. produce a DELETE message to kafka {code:java} echo '{"id":1}#null' | kafka-console-producer --broker-list $broker --topic test_use --property "parse.key=true" --property "key.separator=#"{code} 2. run a flink sql job: {code:java} SET sql-client.execution.result-mode = tableau; create table delete_test( id bigint, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'test_use', 'properties.bootstrap.servers' = 'your broker', 'properties.group.id' = 'your group id', 'value.json.fail-on-missing-field' = 'false', 'value.json.ignore-parse-errors' = 'true', 'key.json.fail-on-missing-field' = 'false', 'key.json.ignore-parse-errors' = 'true', 'key.format' = 'json', 'value.format' = 'json'); select * from delete_test ; {code} 3. expect : {code:java} ++--+ | op | id | ++--+ | -D | 1 | {code} actual: no output was (Author: pensz): 1. produce a DELETE message to kafka {code:java} echo '{"id":1}#null' | kafka-console-producer --broker-list $broker --topic test_use --property "parse.key=true" --property "key.separator=#"{code} 2. run a flink sql job: SET sql-client.execution.result-mode = tableau; create table delete_test( id bigint, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'test_use', 'properties.bootstrap.servers' = 'your broker', 'properties.group.id' = 'your group id', 'value.json.fail-on-missing-field' = 'false', 'value.json.ignore-parse-errors' = 'true', 'key.json.fail-on-missing-field' = 'false', 'key.json.ignore-parse-errors' = 'true', 'key.format' = 'json', 'value.format' = 'json'); select * from delete_test ; 3. expect : {code:java} ++--+ | op | id | ++--+ | -D | 1 | {code} actual: no output > upsert-kafka can't process delete message from upsert-kafka sink > > > Key: FLINK-27663 > URL: https://issues.apache.org/jira/browse/FLINK-27663 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.15.0, 1.13.6, 1.14.4 >Reporter: Zhiwen Sun >Priority: Major > > upsert-kafka write DELETE data as Kafka messages with null values (indicate > tombstone for the key). > But when use upsert-kafka as a source table to consumer kafka messages write > by upsert-kafka sink, DELETE messages will be ignored. > > related sql : > > > {code:java} > create table order_system_log( > id bigint, > PRIMARY KEY (id) NOT ENFORCED > ) WITH ( > 'connector' = 'upsert-kafka', > 'topic' = 'test_use', > 'properties.bootstrap.servers' = 'your broker', > 'properties.group.id' = 'your group id', > 'value.json.fail-on-missing-field' = 'false', > 'value.json.ignore-parse-errors' = 'true', > 'key.json.fail-on-missing-field' = 'false', > 'key.json.ignore-parse-errors' = 'true', > 'key.format' = 'json', > 'value.format' = 'json' > ); > select > * > from > order_system_log > ; > {code} > > > The problem may be produced by DeserializationSchema#deserialize, > this method does not collect data while subclass's deserialize return null. > > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-27663) upsert-kafka can't process delete message from upsert-kafka sink
[ https://issues.apache.org/jira/browse/FLINK-27663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538556#comment-17538556 ] Zhiwen Sun commented on FLINK-27663: 1. produce a DELETE message to kafka {code:java} echo '{"id":1}#null' | kafka-console-producer --broker-list $broker --topic test_use --property "parse.key=true" --property "key.separator=#"{code} 2. run a flink sql job: SET sql-client.execution.result-mode = tableau; create table delete_test( id bigint, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'test_use', 'properties.bootstrap.servers' = 'your broker', 'properties.group.id' = 'your group id', 'value.json.fail-on-missing-field' = 'false', 'value.json.ignore-parse-errors' = 'true', 'key.json.fail-on-missing-field' = 'false', 'key.json.ignore-parse-errors' = 'true', 'key.format' = 'json', 'value.format' = 'json'); select * from delete_test ; 3. expect : {code:java} ++--+ | op | id | ++--+ | -D | 1 | {code} actual: no output > upsert-kafka can't process delete message from upsert-kafka sink > > > Key: FLINK-27663 > URL: https://issues.apache.org/jira/browse/FLINK-27663 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.15.0, 1.13.6, 1.14.4 >Reporter: Zhiwen Sun >Priority: Major > > upsert-kafka write DELETE data as Kafka messages with null values (indicate > tombstone for the key). > But when use upsert-kafka as a source table to consumer kafka messages write > by upsert-kafka sink, DELETE messages will be ignored. > > related sql : > > > {code:java} > create table order_system_log( > id bigint, > PRIMARY KEY (id) NOT ENFORCED > ) WITH ( > 'connector' = 'upsert-kafka', > 'topic' = 'test_use', > 'properties.bootstrap.servers' = 'your broker', > 'properties.group.id' = 'your group id', > 'value.json.fail-on-missing-field' = 'false', > 'value.json.ignore-parse-errors' = 'true', > 'key.json.fail-on-missing-field' = 'false', > 'key.json.ignore-parse-errors' = 'true', > 'key.format' = 'json', > 'value.format' = 'json' > ); > select > * > from > order_system_log > ; > {code} > > > The problem may be produced by DeserializationSchema#deserialize, > this method does not collect data while subclass's deserialize return null. > > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (FLINK-27663) upsert-kafka can't process delete message from upsert-kafka sink
[ https://issues.apache.org/jira/browse/FLINK-27663?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhiwen Sun updated FLINK-27663: --- Component/s: Connectors / Kafka (was: Formats (JSON, Avro, Parquet, ORC, SequenceFile)) > upsert-kafka can't process delete message from upsert-kafka sink > > > Key: FLINK-27663 > URL: https://issues.apache.org/jira/browse/FLINK-27663 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.15.0, 1.13.6, 1.14.4 >Reporter: Zhiwen Sun >Priority: Major > > upsert-kafka write DELETE data as Kafka messages with null values (indicate > tombstone for the key). > But when use upsert-kafka as a source table to consumer kafka messages write > by upsert-kafka sink, DELETE messages will be ignored. > > related sql : > > > {code:java} > create table order_system_log( > id bigint, > PRIMARY KEY (id) NOT ENFORCED > ) WITH ( > 'connector' = 'upsert-kafka', > 'topic' = 'test_use', > 'properties.bootstrap.servers' = 'your broker', > 'properties.group.id' = 'your group id', > 'value.json.fail-on-missing-field' = 'false', > 'value.json.ignore-parse-errors' = 'true', > 'key.json.fail-on-missing-field' = 'false', > 'key.json.ignore-parse-errors' = 'true', > 'key.format' = 'json', > 'value.format' = 'json' > ); > select > * > from > order_system_log > ; > {code} > > > The problem may be produced by DeserializationSchema#deserialize, > this method does not collect data while subclass's deserialize return null. > > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] xinbinhuang commented on pull request #17873: [FLINK-25009][CLI] Output slotSharingGroup as part of JsonGraph
xinbinhuang commented on PR #17873: URL: https://github.com/apache/flink/pull/17873#issuecomment-1129531377 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #101: [FLINK-27366] Record schema on filesystem path
JingsongLi commented on code in PR #101: URL: https://github.com/apache/flink-table-store/pull/101#discussion_r875435428 ## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java: ## @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.schema; + +import org.apache.flink.core.fs.Path; +import org.apache.flink.table.store.file.operation.Lock; +import org.apache.flink.table.store.file.utils.FileUtils; +import org.apache.flink.table.store.file.utils.JsonSerdeUtil; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.Callable; + +import static org.apache.flink.table.store.file.utils.FileUtils.listVersionedFiles; + +/** Schema Manager to manage schema versions. */ +public class SchemaManager { + +private static final String SCHEMA_PREFIX = "schema-"; + +private final Path tableRoot; + +/** Default no lock. */ +private Lock lock = Callable::call; + +public SchemaManager(Path tableRoot) { +this.tableRoot = tableRoot; +} + +public SchemaManager withLock(Lock lock) { +this.lock = lock; +return this; +} + +/** @return latest schema. */ +public Optional latest() { +try { +return listVersionedFiles(schemaDirectory(), SCHEMA_PREFIX) +.reduce(Math::max) +.map(this::schema); Review Comment: schema changes are a very low frequency thing (Compared to snapshot generation) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #101: [FLINK-27366] Record schema on filesystem path
JingsongLi commented on code in PR #101: URL: https://github.com/apache/flink-table-store/pull/101#discussion_r875435232 ## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java: ## @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.schema; + +import org.apache.flink.core.fs.Path; +import org.apache.flink.table.store.file.operation.Lock; +import org.apache.flink.table.store.file.utils.FileUtils; +import org.apache.flink.table.store.file.utils.JsonSerdeUtil; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.Callable; + +import static org.apache.flink.table.store.file.utils.FileUtils.listVersionedFiles; + +/** Schema Manager to manage schema versions. */ +public class SchemaManager { + +private static final String SCHEMA_PREFIX = "schema-"; + +private final Path tableRoot; + +/** Default no lock. */ +private Lock lock = Callable::call; + +public SchemaManager(Path tableRoot) { +this.tableRoot = tableRoot; +} + +public SchemaManager withLock(Lock lock) { +this.lock = lock; +return this; +} + +/** @return latest schema. */ +public Optional latest() { +try { +return listVersionedFiles(schemaDirectory(), SCHEMA_PREFIX) +.reduce(Math::max) +.map(this::schema); Review Comment: I don't think it's necessary for schema because there won't be too many versions of schema -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] openinx commented on pull request #117: [FLINK-27543] Hide column statistics collector inside the file format writer.
openinx commented on PR #117: URL: https://github.com/apache/flink-table-store/pull/117#issuecomment-1129526584 The broken e2e unit tests is: ``` Error: Tests run: 10, Failures: 2, Errors: 0, Skipped: 0, Time elapsed: 18.257 s <<< FAILURE! - in org.apache.flink.table.store.file.mergetree.MergeTreeTest Error: testWriteMany Time elapsed: 5.328 s <<< FAILURE! org.opentest4j.AssertionFailedError: expected: [TestRecord{kind=ADD, k=0, v=1439555325}, TestRecord{kind=ADD, k=2, v=-87577981}, TestRecord{kind=ADD, k=3, v=-1671555357}, TestRecord{kind=ADD, k=4, v=-1885526753}, TestRecord{kind=ADD, k=6, v=-1780307938}, TestRecord{kind=ADD, k=7, v=-1031004329}, TestRecord{kind=ADD, k=8, v=-1545344531}, TestRecord{kind=ADD, k=14, v=1882741690}, TestRecord{kind=ADD, k=15, v=1399126480}, TestRecord{kind=ADD, k=16, v=1331804956}, TestRecord{kind=ADD, k=18, v=1642412855}, TestRecord{kind=ADD, k=20, v=-413068575}, TestRecord{kind=ADD, k=22, v=1865500343}, TestRecord{kind=ADD, k=23, v=-876643231}, TestRecord{kind=ADD, k=26, v=-351327562}, TestRecord{kind=ADD, k=28, v=-2031549655}, TestRecord{kind=ADD, k=30, v=-241253083}, TestRecord{kind=ADD, k=32, v=1256513581}, TestRecord{kind=ADD, k=33, v=-1094310640}, TestRecord{kind=ADD, k=34, v=-206957727}, TestRecord{kind=ADD, k=35, v=545094851}, TestRecord{kind=ADD, k=36, v=2090965492}, TestRecord{kind=ADD, k=37, v=-612267756}, TestRecord{kind=ADD, k=41, v=-1668976908}, TestRecord{kind=ADD, k=42, v=1582283169}, TestRecord{kind=ADD, k=43, v=-1390868689}, TestRecord{kind=ADD, k=44, v=-95457355}, TestRecord{kind=ADD, k=46, v=-97023}, TestRecord{kind=ADD, k=47, v=770974724}, TestRecord{kind=ADD, k=48, v=-256528682}, TestRecord{kind=ADD, k=49, v=871707733}, TestRecord{kind=ADD, k=50, v=-108981698}, TestRecord{kind=ADD, k=52, v=-1922275264}, TestRecord{kind=ADD, k=53, v=1127951319}, TestRecord{kind=ADD, k=55, v=1949021850}, TestRecord{kind=ADD, k=56, v=-1780698771}, TestRecord{kind=ADD, k=57, v=-678952533}, TestRecord{kind=ADD, k=60, v=1786413275}, TestRecord{kind=ADD, k=61, v=1073764052}, TestRecord{kind=ADD, k=62, v=1119198658}, TestRecord{kind=ADD, k=63, v=-1960775568}, TestRecord{kind=ADD, k=64, v=1396679991}, TestRecord{kind=ADD, k=65, v=-1823346675}, TestRecord{kind=ADD, k=66, v=49904175}, TestRecord{kind=ADD, k=67, v=-1213641065}, TestRecord{kind=ADD, k=68, v=1017599222}, TestRecord{kind=ADD, k=70, v=730983381}, TestRecord{kind=ADD, k=71, v=632902325}, TestRecord{kind=ADD, k=72, v=1677474125}, TestRecord{kind=ADD, k=6322, v=-1577426721}, TestRecord{kind=ADD, k=6713, v=477480864}, TestRecord{kind=ADD, k=6766, v=-195595673}, TestRecord{kind=ADD, k=6878, v=922070171}, TestRecord{kind=ADD, k=6908, v=1279871284}, TestRecord{kind=ADD, k=6964, v=-183467818}, TestRecord{kind=ADD, k=7212, v=1913728452}, TestRecord{kind=ADD, k=7277, v=777112023}, TestRecord{kind=ADD, k=7297, v=-451539894}, TestRecord{kind=ADD, k=7306, v=363784635}, TestRecord{kind=ADD, k=7378, v=-1455922425}, TestRecord{kind=ADD, k=7532, v=-2058675165}, TestRecord{kind=ADD, k=7535, v=885332866}, TestRecord{kind=ADD, k=7566, v=-1491380324}, TestRecord{kind=ADD, k=7626, v=383380493}, TestRecord{kind=ADD, k=7701, v=1901042762}, TestRecord{kind=ADD, k=7797, v=988033575}, TestRecord{kind=ADD, k=7972, v=-1102848478}, TestRecord{kind=ADD, k=7984, v=1336320866}, TestRecord{kind=ADD, k=8335, v=-166648690}, TestRecord{kind=ADD, k=8362, v=-1958838012}, TestRecord{kind=ADD, k=8676, v=-1305920106}, TestRecord{kind=ADD, k=8824, v=-1534970469}, TestRecord{kind=ADD, k=9003, v=-25406237}, TestRecord{kind=ADD, k=9023, v=-1444326810}, TestRecord{kind=ADD, k=9125, v=-307503062}, TestRecord{kind=ADD, k=9144, v=-1251249360}, TestRecord{kind=ADD, k=9291, v=158168057}, TestRecord{kind=ADD, k=9346, v=1456276531}, TestRecord{kind=ADD, k=9580, v=-1180486045}, TestRecord{kind=ADD, k=9683, v=-1625710048}, TestRecord{kind=ADD, k=9698, v=169574801}, TestRecord{kind=ADD, k=9918, v=1835226686}, TestRecord{kind=ADD, k=9922, v=-1687709970}, TestRecord{kind=ADD, k=9993, v=-63074385}, TestRecord{kind=ADD, k=9995, v=209840468}, TestRecord{kind=ADD, k=9996, v=-1926512297}, TestRecord{kind=ADD, k=9997, v=680790216}] [INFO] Error: Tests run: 410, Failures: 2, Errors: 0, Skipped: 0 ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go
[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #117: [FLINK-27543] Hide column statistics collector inside the file format writer.
JingsongLi commented on code in PR #117: URL: https://github.com/apache/flink-table-store/pull/117#discussion_r875433467 ## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java: ## @@ -112,26 +120,17 @@ public void delete(String fileName) { FileUtils.deleteOrWarn(pathFactory.toManifestFilePath(fileName)); } -private class ManifestEntryBulkWriterFactory implements BulkWriter.Factory { - -@Override -public BulkWriter create(FSDataOutputStream out) throws IOException { -return new BaseBulkWriter<>(writerFactory.create(out), serializer::toRow); -} -} - private class ManifestEntryWriter extends BaseFileWriter { -private final FieldStatsCollector statsCollector; - +private final FieldStatsCollector fieldStatsCollector; Review Comment: naming: `partitionStatsCollector`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] openinx closed pull request #125: [FLINK-27543] Hide column statistics collector inside the file format writer.
openinx closed pull request #125: [FLINK-27543] Hide column statistics collector inside the file format writer. URL: https://github.com/apache/flink-table-store/pull/125 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] openinx commented on pull request #125: [FLINK-27543] Hide column statistics collector inside the file format writer.
openinx commented on PR #125: URL: https://github.com/apache/flink-table-store/pull/125#issuecomment-1129525292 Let's close this PR as we've agreed to use another one: https://github.com/apache/flink-table-store/pull/117 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] tsreaper commented on pull request #121: [FLINK-27626] Introduce AggregatuibMergeFunction
tsreaper commented on PR #121: URL: https://github.com/apache/flink-table-store/pull/121#issuecomment-1129520834 > How should I configure `ConfigOption MERGE_ENGINE ` to get `columnName.aggregate-function = sum` from the parameter `FileStoreOptions` of `static FileStoreImpl createWithPrimaryKey` function? You can get all config options from `FileStoreOptions.options` member. You might need to add a method to `FileStoreOptions` to fetch out a random key. ```sql CREATE TABLE IF NOT EXISTS T ( j INT, k INT, a INT, b INT, c STRING, PRIMARY KEY (j,k) NOT ENFORCED ) WITH ('merge-engine'='partial-update', 'specialkey'='hahahaha'); ``` ![20220518110552](https://user-images.githubusercontent.com/19909549/168949470-9d7e88e3-08a5-4666-b064-53b995081702.jpg) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] openinx commented on a diff in pull request #117: [FLINK-27543] Hide column statistics collector inside the file format writer.
openinx commented on code in PR #117: URL: https://github.com/apache/flink-table-store/pull/117#discussion_r875418990 ## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java: ## @@ -52,23 +55,32 @@ public class ManifestFile { private static final Logger LOG = LoggerFactory.getLogger(ManifestFile.class); private final RowType partitionType; +private final RowType entryType; private final ManifestEntrySerializer serializer; +private final FieldStatsArraySerializer statsSerializer; private final BulkFormat readerFactory; private final BulkWriter.Factory writerFactory; +private final FileStatsExtractor fileStatsExtractor; private final FileStorePathFactory pathFactory; private final long suggestedFileSize; private ManifestFile( RowType partitionType, +RowType entryType, Review Comment: Please see: https://github.com/apache/flink-table-store/pull/117/files#diff-4b9ebe9adecbf113c86f69fe4ef1c4fbb0395798eefe476e2d6f2de629e09e53R164-R166 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] openinx commented on a diff in pull request #117: [FLINK-27543] Hide column statistics collector inside the file format writer.
openinx commented on code in PR #117: URL: https://github.com/apache/flink-table-store/pull/117#discussion_r875418826 ## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java: ## @@ -52,23 +55,32 @@ public class ManifestFile { private static final Logger LOG = LoggerFactory.getLogger(ManifestFile.class); private final RowType partitionType; +private final RowType entryType; private final ManifestEntrySerializer serializer; +private final FieldStatsArraySerializer statsSerializer; private final BulkFormat readerFactory; private final BulkWriter.Factory writerFactory; +private final FileStatsExtractor fileStatsExtractor; private final FileStorePathFactory pathFactory; private final long suggestedFileSize; private ManifestFile( RowType partitionType, +RowType entryType, Review Comment: Although we don't rely on any metric which is generated from the `MetricFileWriter` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] openinx commented on a diff in pull request #117: [FLINK-27543] Hide column statistics collector inside the file format writer.
openinx commented on code in PR #117: URL: https://github.com/apache/flink-table-store/pull/117#discussion_r875417510 ## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java: ## @@ -112,26 +124,22 @@ public void delete(String fileName) { FileUtils.deleteOrWarn(pathFactory.toManifestFilePath(fileName)); } -private class ManifestEntryBulkWriterFactory implements BulkWriter.Factory { - -@Override -public BulkWriter create(FSDataOutputStream out) throws IOException { -return new BaseBulkWriter<>(writerFactory.create(out), serializer::toRow); -} +private FileWriter.Factory createManifestEntryWriterFactory() { Review Comment: Yes, I like this idea. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] openinx commented on a diff in pull request #117: [FLINK-27543] Hide column statistics collector inside the file format writer.
openinx commented on code in PR #117: URL: https://github.com/apache/flink-table-store/pull/117#discussion_r875417184 ## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java: ## @@ -52,23 +55,32 @@ public class ManifestFile { private static final Logger LOG = LoggerFactory.getLogger(ManifestFile.class); private final RowType partitionType; +private final RowType entryType; private final ManifestEntrySerializer serializer; +private final FieldStatsArraySerializer statsSerializer; private final BulkFormat readerFactory; private final BulkWriter.Factory writerFactory; +private final FileStatsExtractor fileStatsExtractor; private final FileStorePathFactory pathFactory; private final long suggestedFileSize; private ManifestFile( RowType partitionType, +RowType entryType, ManifestEntrySerializer serializer, +FieldStatsArraySerializer statsSerializer, Review Comment: This argument can be removed now . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] openinx commented on a diff in pull request #117: [FLINK-27543] Hide column statistics collector inside the file format writer.
openinx commented on code in PR #117: URL: https://github.com/apache/flink-table-store/pull/117#discussion_r875416965 ## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java: ## @@ -52,23 +55,32 @@ public class ManifestFile { private static final Logger LOG = LoggerFactory.getLogger(ManifestFile.class); private final RowType partitionType; +private final RowType entryType; private final ManifestEntrySerializer serializer; +private final FieldStatsArraySerializer statsSerializer; private final BulkFormat readerFactory; private final BulkWriter.Factory writerFactory; +private final FileStatsExtractor fileStatsExtractor; private final FileStorePathFactory pathFactory; private final long suggestedFileSize; private ManifestFile( RowType partitionType, +RowType entryType, Review Comment: The reason why we change this `ManifestFile` class is : we've changed the `BaseFileWriter` to use the `FileWriter.Factory` ( It use `BulkWriter.Factory` before). So we have to change the `ManifestEntryWriter` implementation so that we could still extend the `BaseFileWriter`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] openinx commented on a diff in pull request #117: [FLINK-27543] Hide column statistics collector inside the file format writer.
openinx commented on code in PR #117: URL: https://github.com/apache/flink-table-store/pull/117#discussion_r875414820 ## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileWriter.java: ## @@ -237,7 +210,14 @@ public RollingKvWriter(Supplier writerFactory, long targetFileSize private Supplier createWriterFactory(int level) { return () -> { try { -return new KvFileWriter(new KvBulkWriterFactory(), pathFactory.newPath(), level); +FileWriter.Factory factory = Review Comment: Yes, I agree. It will be more clear if moving the factory initialization to the constructor. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-27008) Document the configurable parameters of the helm chart and their default values
[ https://issues.apache.org/jira/browse/FLINK-27008?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yang Wang reassigned FLINK-27008: - Assignee: Nicholas Jiang > Document the configurable parameters of the helm chart and their default > values > --- > > Key: FLINK-27008 > URL: https://issues.apache.org/jira/browse/FLINK-27008 > Project: Flink > Issue Type: Sub-task > Components: Kubernetes Operator >Reporter: Yang Wang >Assignee: Nicholas Jiang >Priority: Major > Labels: starter > Fix For: kubernetes-operator-1.0.0 > > > We might need a table to document all the configurable parameters of the helm > chart and their default values. It could be put in the helm section[1]. > ||Parameters||Description||Default Value|| > |watchNamespaces|List of kubernetes namespaces to watch for FlinkDeployment > changes, empty means all namespaces.| | > |image.repository|The image repository of > flink-kubernetes-operator|ghcr.io/apache/flink-kubernetes-operator| > |...|...| | > [1]. > https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/helm/ -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink-ml] yunfengzhou-hub commented on a diff in pull request #100: [FLINK-27096] Add LabeledPointWithWeightGenerator and Benchmark Configuration for KMeans and NaiveBayes
yunfengzhou-hub commented on code in PR #100: URL: https://github.com/apache/flink-ml/pull/100#discussion_r875396471 ## flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/datagenerator/common/LabeledPointWithWeightGenerator.java: ## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.benchmark.datagenerator.common; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.ml.benchmark.datagenerator.InputDataGenerator; +import org.apache.flink.ml.benchmark.datagenerator.param.HasVectorDim; +import org.apache.flink.ml.common.datastream.TableUtils; +import org.apache.flink.ml.common.param.HasFeaturesCol; +import org.apache.flink.ml.common.param.HasLabelCol; +import org.apache.flink.ml.common.param.HasWeightCol; +import org.apache.flink.ml.linalg.Vectors; +import org.apache.flink.ml.linalg.typeinfo.DenseVectorTypeInfo; +import org.apache.flink.ml.param.IntParam; +import org.apache.flink.ml.param.Param; +import org.apache.flink.ml.param.ParamValidators; +import org.apache.flink.ml.util.ParamUtils; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.types.Row; +import org.apache.flink.util.NumberSequenceIterator; + +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +/** A DataGenerator which creates a table of features, label and weight. */ +public class LabeledPointWithWeightGenerator +implements InputDataGenerator, +HasFeaturesCol, +HasLabelCol, +HasWeightCol, +HasVectorDim { + +public static final Param FEATURE_ARITY = +new IntParam( +"featureArity", +"Arity of each feature. " ++ "If set to positive value, each feature would be an integer in range [0, arity - 1]. " ++ "If set to zero, each feature would be a continuous double in range [0, 1).", +2, +ParamValidators.gtEq(0)); + +public static final Param LABEL_ARITY = +new IntParam( +"labelArity", +"Arity of label. " ++ "If set to positive value, the label would be an integer in range [0, arity - 1]. " ++ "If set to zero, the label would be a continuous double in range [0, 1).", +2, +ParamValidators.gtEq(0)); + +public static final Param WEIGHT_ARITY = +new IntParam( +"weightArity", +"Arity of weight. " ++ "If set to positive value, the weight would be an integer in range [1, arity]. " ++ "If set to zero, weight would be a continuous double in range [0, 1).", +1, +ParamValidators.gtEq(0)); + +private final Map, Object> paramMap = new HashMap<>(); + +public LabeledPointWithWeightGenerator() { +ParamUtils.initializeMapWithDefaultValues(paramMap, this); +} + +public int getFeatureArity() { +return get(FEATURE_ARITY); +} + +public LabeledPointWithWeightGenerator setFeatureArity(int value) { +return set(FEATURE_ARITY, value); +} + +public int getLabelArity() { +return get(LABEL_ARITY); +} + +public LabeledPointWithWeightGenerator setLabelArity(int value) { +return set(LABEL_ARITY, value); +} + +public int getWeightArity() { +return get(WEIGHT_ARITY); +} + +public LabeledPointWithWeightGenerator
[jira] [Updated] (FLINK-27670) Python wrappers for Kinesis Sinks
[ https://issues.apache.org/jira/browse/FLINK-27670?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Huang Xingbo updated FLINK-27670: - Fix Version/s: 1.16.0 (was: 1.15.1) > Python wrappers for Kinesis Sinks > - > > Key: FLINK-27670 > URL: https://issues.apache.org/jira/browse/FLINK-27670 > Project: Flink > Issue Type: Improvement > Components: API / Python, Connectors / Kinesis >Reporter: Zichen Liu >Priority: Major > Fix For: 1.16.0 > > > Create Python Wrappers for the new Kinesis Streams sink and the Kinesis > Firehose sink. > An example implementation may be found here > [https://github.com/apache/flink/pull/15491/files] for the old Kinesis sink. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (FLINK-27670) Python wrappers for Kinesis Sinks
[ https://issues.apache.org/jira/browse/FLINK-27670?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Huang Xingbo updated FLINK-27670: - Component/s: API / Python > Python wrappers for Kinesis Sinks > - > > Key: FLINK-27670 > URL: https://issues.apache.org/jira/browse/FLINK-27670 > Project: Flink > Issue Type: Improvement > Components: API / Python, Connectors / Kinesis >Reporter: Zichen Liu >Priority: Major > Fix For: 1.15.1 > > > Create Python Wrappers for the new Kinesis Streams sink and the Kinesis > Firehose sink. > An example implementation may be found here > [https://github.com/apache/flink/pull/15491/files] for the old Kinesis sink. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27674) Elasticsearch6SinkE2ECase test hangs
Huang Xingbo created FLINK-27674: Summary: Elasticsearch6SinkE2ECase test hangs Key: FLINK-27674 URL: https://issues.apache.org/jira/browse/FLINK-27674 Project: Flink Issue Type: Bug Components: Connectors / ElasticSearch Affects Versions: 1.16.0 Reporter: Huang Xingbo {code:java} 2022-05-17T14:23:20.2220667Z May 17 14:23:20 [INFO] Running org.apache.flink.streaming.tests.Elasticsearch6SinkE2ECase 2022-05-17T16:46:42.2757438Z == 2022-05-17T16:46:42.2769104Z === WARNING: This task took already 95% of the available time budget of 284 minutes === 2022-05-17T16:46:42.2769752Z == 2022-05-17T16:46:42.2777415Z == 2022-05-17T16:46:42.2778082Z The following Java processes are running (JPS) 2022-05-17T16:46:42.2778711Z == 2022-05-17T16:46:42.4370838Z 366642 surefirebooter7296810110127514313.jar 2022-05-17T16:46:42.4371962Z 323278 Launcher 2022-05-17T16:46:42.4378950Z 384312 Jps 2022-05-17T16:46:42.4452836Z == 2022-05-17T16:46:42.4453843Z Printing stack trace of Java process 366642 2022-05-17T16:46:42.4454796Z == 2022-05-17T16:46:42.9967200Z 2022-05-17 16:46:42 2022-05-17T16:46:42.9968158Z Full thread dump OpenJDK 64-Bit Server VM (25.332-b09 mixed mode): 2022-05-17T16:46:42.9968402Z 2022-05-17T16:46:42.9968953Z "Attach Listener" #6017 daemon prio=9 os_prio=0 tid=0x7f4cf0007000 nid=0x5dd53 waiting on condition [0x] 2022-05-17T16:46:42.9969533Zjava.lang.Thread.State: RUNNABLE 2022-05-17T16:46:42.9969714Z 2022-05-17T16:46:42.9970494Z "ForkJoinPool-1-worker-0" #6011 daemon prio=5 os_prio=0 tid=0x7f4b4ed92800 nid=0x5cfea waiting on condition [0x7f4b35b9b000] 2022-05-17T16:46:42.9971118Zjava.lang.Thread.State: TIMED_WAITING (parking) 2022-05-17T16:46:42.9971752Zat sun.misc.Unsafe.park(Native Method) 2022-05-17T16:46:42.9972494Z- parking to wait for <0x939d6cd0> (a java.util.concurrent.ForkJoinPool) 2022-05-17T16:46:42.9973134Zat java.util.concurrent.ForkJoinPool.awaitWork(ForkJoinPool.java:1824) 2022-05-17T16:46:42.9973801Zat java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1693) 2022-05-17T16:46:43.0113835Zat java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) 2022-05-17T16:46:43.0114433Z 2022-05-17T16:46:43.0118656Z "flink-rest-client-netty-thread-1" #6006 daemon prio=5 os_prio=0 tid=0x7f4b4ed8f000 nid=0x5bb03 runnable [0x7f4b34181000] 2022-05-17T16:46:43.0119406Zjava.lang.Thread.State: RUNNABLE 2022-05-17T16:46:43.0120063Zat sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) 2022-05-17T16:46:43.0120921Zat sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269) 2022-05-17T16:46:43.0121666Zat sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93) 2022-05-17T16:46:43.0122507Zat sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86) 2022-05-17T16:46:43.0123735Z- locked <0xe4e90c80> (a org.apache.flink.shaded.netty4.io.netty.channel.nio.SelectedSelectionKeySet) 2022-05-17T16:46:43.0124873Z- locked <0xe4e90a90> (a java.util.Collections$UnmodifiableSet) 2022-05-17T16:46:43.0125863Z- locked <0xe4e909b8> (a sun.nio.ch.EPollSelectorImpl) 2022-05-17T16:46:43.0126648Zat sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97) 2022-05-17T16:46:43.0127370Zat sun.nio.ch.SelectorImpl.select(SelectorImpl.java:101) 2022-05-17T16:46:43.0128211Zat org.apache.flink.shaded.netty4.io.netty.channel.nio.SelectedSelectionKeySetSelector.select(SelectedSelectionKeySetSelector.java:68) 2022-05-17T16:46:43.0129260Zat org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.select(NioEventLoop.java:810) 2022-05-17T16:46:43.0130119Zat org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:457) 2022-05-17T16:46:43.0131134Zat org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) 2022-05-17T16:46:43.0137869Zat org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) 2022-05-17T16:46:43.0138683Zat java.lang.Thread.run(Thread.java:750) {code} https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=35749=logs=af184cdd-c6d8-5084-0b69-7e9c67b35f7a=160c9ae5-96fd-516e-1c91-deb81f59292a -- This message was sent by Atlassian Jira
[jira] [Comment Edited] (FLINK-27257) Flink kubernetes operator triggers savepoint failed because of not all tasks running
[ https://issues.apache.org/jira/browse/FLINK-27257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538539#comment-17538539 ] Nicholas Jiang edited comment on FLINK-27257 at 5/18/22 1:44 AM: - I have offline discussed with [~wangyang0918]. It's hard to improve the isJobRunning implementation because the returned JobStatusMessages of ClusterClient#listJobs() don't contain the tasksPerState of the JobDetail, which cause that there is no way to judge whether all the ExecutionState of tasks are running in the job and for batch tasks how to judge whether the job is really running. The current idea is that if the error is found to be not all required tasks are currently running, then continue to trigger savepoint in the next reconciliation until it is successfully triggered. [~gyfora][~wangyang0918] WDYT? was (Author: nicholasjiang): I have offline discussed with [~wangyang0918]. It's hard to improve the isJobRunning implementation because the returned JobStatusMessages of ClusterClient#listJobs() don't contain the tasksPerState of the JobDetail, which cause that there is no way to judge whether all the ExecutionState of tasks are running in the job and for batch tasks how to judge whether the job is really running. The current idea is that if the error is found to be Not all required tasks are currently running, then continue to trigger savepoint in the next reconciliation until it is successfully triggered. [~gyfora][~wangyang0918] WDYT? > Flink kubernetes operator triggers savepoint failed because of not all tasks > running > > > Key: FLINK-27257 > URL: https://issues.apache.org/jira/browse/FLINK-27257 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Reporter: Yang Wang >Assignee: Nicholas Jiang >Priority: Major > Fix For: kubernetes-operator-1.1.0 > > > {code:java} > 2022-04-15 02:38:56,551 o.a.f.k.o.s.FlinkService [INFO > ][default/flink-example-statemachine] Fetching savepoint result with > triggerId: 182d7f176496856d7b33fe2f3767da18 > 2022-04-15 02:38:56,690 o.a.f.k.o.s.FlinkService > [ERROR][default/flink-example-statemachine] Savepoint error > org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint > triggering task Source: Custom Source (1/2) of job > is not being executed at the moment. > Aborting checkpoint. Failure reason: Not all required tasks are currently > running. > at > org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.checkTasksStarted(DefaultCheckpointPlanCalculator.java:143) > at > org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.lambda$calculateCheckpointPlan$1(DefaultCheckpointPlanCalculator.java:105) > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:455) > at > org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:455) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) > at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) > at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) > at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) > at akka.actor.Actor.aroundReceive(Actor.scala:537) > at akka.actor.Actor.aroundReceive$(Actor.scala:535) > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) > at akka.actor.ActorCell.invoke(ActorCell.scala:548) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) > at akka.dispatch.Mailbox.run(Mailbox.scala:231) > at akka.dispatch.Mailbox.exec(Mailbox.scala:243) > at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) > at > java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) > at
[jira] [Commented] (FLINK-27257) Flink kubernetes operator triggers savepoint failed because of not all tasks running
[ https://issues.apache.org/jira/browse/FLINK-27257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538539#comment-17538539 ] Nicholas Jiang commented on FLINK-27257: I have offline discussed with [~wangyang0918]. It's hard to improve the isJobRunning implementation because the returned JobStatusMessages of ClusterClient#listJobs() don't contain the tasksPerState of the JobDetail, which cause that there is no way to judge whether all the ExecutionState of tasks are running in the job and for batch tasks how to judge whether the job is really running. The current idea is that if the error is found to be Not all required tasks are currently running, then continue to trigger savepoint in the next reconciliation until it is successfully triggered. [~gyfora][~wangyang0918] WDYT? > Flink kubernetes operator triggers savepoint failed because of not all tasks > running > > > Key: FLINK-27257 > URL: https://issues.apache.org/jira/browse/FLINK-27257 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Reporter: Yang Wang >Assignee: Nicholas Jiang >Priority: Major > Fix For: kubernetes-operator-1.1.0 > > > {code:java} > 2022-04-15 02:38:56,551 o.a.f.k.o.s.FlinkService [INFO > ][default/flink-example-statemachine] Fetching savepoint result with > triggerId: 182d7f176496856d7b33fe2f3767da18 > 2022-04-15 02:38:56,690 o.a.f.k.o.s.FlinkService > [ERROR][default/flink-example-statemachine] Savepoint error > org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint > triggering task Source: Custom Source (1/2) of job > is not being executed at the moment. > Aborting checkpoint. Failure reason: Not all required tasks are currently > running. > at > org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.checkTasksStarted(DefaultCheckpointPlanCalculator.java:143) > at > org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.lambda$calculateCheckpointPlan$1(DefaultCheckpointPlanCalculator.java:105) > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:455) > at > org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:455) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) > at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) > at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) > at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) > at akka.actor.Actor.aroundReceive(Actor.scala:537) > at akka.actor.Actor.aroundReceive$(Actor.scala:535) > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) > at akka.actor.ActorCell.invoke(ActorCell.scala:548) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) > at akka.dispatch.Mailbox.run(Mailbox.scala:231) > at akka.dispatch.Mailbox.exec(Mailbox.scala:243) > at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) > at > java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) > at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) > at > java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) > 2022-04-15 02:38:56,693 o.a.f.k.o.o.SavepointObserver > [ERROR][default/flink-example-statemachine] Checkpoint triggering task > Source: Custom Source (1/2) of job is not > being executed at the moment. Aborting checkpoint. Failure reason: Not all > required tasks are currently running. {code} > How to reproduce? > Update arbitrary fields(e.g. parallelism) along with > {{{}savepointTriggerNonce{}}}. > > The root cause might be the running state return by > {{ClusterClient#listJobs()}} does not mean all the
[GitHub] [flink-ml] yunfengzhou-hub commented on a diff in pull request #100: [FLINK-27096] Add LabeledPointWithWeightGenerator and Benchmark Configuration for KMeans and NaiveBayes
yunfengzhou-hub commented on code in PR #100: URL: https://github.com/apache/flink-ml/pull/100#discussion_r875396471 ## flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/datagenerator/common/LabeledPointWithWeightGenerator.java: ## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.benchmark.datagenerator.common; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.ml.benchmark.datagenerator.InputDataGenerator; +import org.apache.flink.ml.benchmark.datagenerator.param.HasVectorDim; +import org.apache.flink.ml.common.datastream.TableUtils; +import org.apache.flink.ml.common.param.HasFeaturesCol; +import org.apache.flink.ml.common.param.HasLabelCol; +import org.apache.flink.ml.common.param.HasWeightCol; +import org.apache.flink.ml.linalg.Vectors; +import org.apache.flink.ml.linalg.typeinfo.DenseVectorTypeInfo; +import org.apache.flink.ml.param.IntParam; +import org.apache.flink.ml.param.Param; +import org.apache.flink.ml.param.ParamValidators; +import org.apache.flink.ml.util.ParamUtils; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.types.Row; +import org.apache.flink.util.NumberSequenceIterator; + +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +/** A DataGenerator which creates a table of features, label and weight. */ +public class LabeledPointWithWeightGenerator +implements InputDataGenerator, +HasFeaturesCol, +HasLabelCol, +HasWeightCol, +HasVectorDim { + +public static final Param FEATURE_ARITY = +new IntParam( +"featureArity", +"Arity of each feature. " ++ "If set to positive value, each feature would be an integer in range [0, arity - 1]. " ++ "If set to zero, each feature would be a continuous double in range [0, 1).", +2, +ParamValidators.gtEq(0)); + +public static final Param LABEL_ARITY = +new IntParam( +"labelArity", +"Arity of label. " ++ "If set to positive value, the label would be an integer in range [0, arity - 1]. " ++ "If set to zero, the label would be a continuous double in range [0, 1).", +2, +ParamValidators.gtEq(0)); + +public static final Param WEIGHT_ARITY = +new IntParam( +"weightArity", +"Arity of weight. " ++ "If set to positive value, the weight would be an integer in range [1, arity]. " ++ "If set to zero, weight would be a continuous double in range [0, 1).", +1, +ParamValidators.gtEq(0)); + +private final Map, Object> paramMap = new HashMap<>(); + +public LabeledPointWithWeightGenerator() { +ParamUtils.initializeMapWithDefaultValues(paramMap, this); +} + +public int getFeatureArity() { +return get(FEATURE_ARITY); +} + +public LabeledPointWithWeightGenerator setFeatureArity(int value) { +return set(FEATURE_ARITY, value); +} + +public int getLabelArity() { +return get(LABEL_ARITY); +} + +public LabeledPointWithWeightGenerator setLabelArity(int value) { +return set(LABEL_ARITY, value); +} + +public int getWeightArity() { +return get(WEIGHT_ARITY); +} + +public LabeledPointWithWeightGenerator
[jira] (FLINK-27257) Flink kubernetes operator triggers savepoint failed because of not all tasks running
[ https://issues.apache.org/jira/browse/FLINK-27257 ] Nicholas Jiang deleted comment on FLINK-27257: was (Author: nicholasjiang): [~gyfora], sorry for later reply. I will push a pull request today. > Flink kubernetes operator triggers savepoint failed because of not all tasks > running > > > Key: FLINK-27257 > URL: https://issues.apache.org/jira/browse/FLINK-27257 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Reporter: Yang Wang >Assignee: Nicholas Jiang >Priority: Major > Fix For: kubernetes-operator-1.1.0 > > > {code:java} > 2022-04-15 02:38:56,551 o.a.f.k.o.s.FlinkService [INFO > ][default/flink-example-statemachine] Fetching savepoint result with > triggerId: 182d7f176496856d7b33fe2f3767da18 > 2022-04-15 02:38:56,690 o.a.f.k.o.s.FlinkService > [ERROR][default/flink-example-statemachine] Savepoint error > org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint > triggering task Source: Custom Source (1/2) of job > is not being executed at the moment. > Aborting checkpoint. Failure reason: Not all required tasks are currently > running. > at > org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.checkTasksStarted(DefaultCheckpointPlanCalculator.java:143) > at > org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.lambda$calculateCheckpointPlan$1(DefaultCheckpointPlanCalculator.java:105) > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:455) > at > org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:455) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) > at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) > at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) > at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) > at akka.actor.Actor.aroundReceive(Actor.scala:537) > at akka.actor.Actor.aroundReceive$(Actor.scala:535) > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) > at akka.actor.ActorCell.invoke(ActorCell.scala:548) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) > at akka.dispatch.Mailbox.run(Mailbox.scala:231) > at akka.dispatch.Mailbox.exec(Mailbox.scala:243) > at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) > at > java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) > at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) > at > java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) > 2022-04-15 02:38:56,693 o.a.f.k.o.o.SavepointObserver > [ERROR][default/flink-example-statemachine] Checkpoint triggering task > Source: Custom Source (1/2) of job is not > being executed at the moment. Aborting checkpoint. Failure reason: Not all > required tasks are currently running. {code} > How to reproduce? > Update arbitrary fields(e.g. parallelism) along with > {{{}savepointTriggerNonce{}}}. > > The root cause might be the running state return by > {{ClusterClient#listJobs()}} does not mean all the tasks are running. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink-ml] yunfengzhou-hub commented on a diff in pull request #100: [FLINK-27096] Add LabeledPointWithWeightGenerator and Benchmark Configuration for KMeans and NaiveBayes
yunfengzhou-hub commented on code in PR #100: URL: https://github.com/apache/flink-ml/pull/100#discussion_r875393271 ## flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/datagenerator/common/LabeledPointWithWeightGenerator.java: ## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.benchmark.datagenerator.common; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.ml.benchmark.datagenerator.InputDataGenerator; +import org.apache.flink.ml.benchmark.datagenerator.param.HasVectorDim; +import org.apache.flink.ml.common.datastream.TableUtils; +import org.apache.flink.ml.common.param.HasFeaturesCol; +import org.apache.flink.ml.common.param.HasLabelCol; +import org.apache.flink.ml.common.param.HasWeightCol; +import org.apache.flink.ml.linalg.Vectors; +import org.apache.flink.ml.linalg.typeinfo.DenseVectorTypeInfo; +import org.apache.flink.ml.param.IntParam; +import org.apache.flink.ml.param.Param; +import org.apache.flink.ml.param.ParamValidators; +import org.apache.flink.ml.util.ParamUtils; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.types.Row; +import org.apache.flink.util.NumberSequenceIterator; + +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +/** A DataGenerator which creates a table of features, label and weight. */ +public class LabeledPointWithWeightGenerator +implements InputDataGenerator, +HasFeaturesCol, +HasLabelCol, +HasWeightCol, +HasVectorDim { + +public static final Param FEATURE_ARITY = +new IntParam( +"featureArity", +"Arity of each feature. " ++ "If set to positive value, each feature would be an integer in range [0, arity - 1]. " ++ "If set to zero, each feature would be a continuous double in range [0, 1).", +2, +ParamValidators.gtEq(0)); + +public static final Param LABEL_ARITY = +new IntParam( +"labelArity", +"Arity of label. " ++ "If set to positive value, the label would be an integer in range [0, arity - 1]. " ++ "If set to zero, the label would be a continuous double in range [0, 1).", +2, +ParamValidators.gtEq(0)); + +public static final Param WEIGHT_ARITY = +new IntParam( +"weightArity", +"Arity of weight. " ++ "If set to positive value, the weight would be an integer in range [1, arity]. " ++ "If set to zero, weight would be a continuous double in range [0, 1).", +1, +ParamValidators.gtEq(0)); + +private final Map, Object> paramMap = new HashMap<>(); + +public LabeledPointWithWeightGenerator() { +ParamUtils.initializeMapWithDefaultValues(paramMap, this); +} + +public int getFeatureArity() { +return get(FEATURE_ARITY); +} + +public LabeledPointWithWeightGenerator setFeatureArity(int value) { +return set(FEATURE_ARITY, value); +} + +public int getLabelArity() { +return get(LABEL_ARITY); +} + +public LabeledPointWithWeightGenerator setLabelArity(int value) { +return set(LABEL_ARITY, value); +} + +public int getWeightArity() { +return get(WEIGHT_ARITY); +} + +public LabeledPointWithWeightGenerator
[GitHub] [flink-ml] yunfengzhou-hub commented on a diff in pull request #100: [FLINK-27096] Add LabeledPointWithWeightGenerator and Benchmark Configuration for KMeans and NaiveBayes
yunfengzhou-hub commented on code in PR #100: URL: https://github.com/apache/flink-ml/pull/100#discussion_r875392599 ## flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/datagenerator/common/LabeledPointWithWeightGenerator.java: ## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.benchmark.datagenerator.common; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.ml.benchmark.datagenerator.InputDataGenerator; +import org.apache.flink.ml.benchmark.datagenerator.param.HasVectorDim; +import org.apache.flink.ml.common.datastream.TableUtils; +import org.apache.flink.ml.common.param.HasFeaturesCol; +import org.apache.flink.ml.common.param.HasLabelCol; +import org.apache.flink.ml.common.param.HasWeightCol; +import org.apache.flink.ml.linalg.Vectors; +import org.apache.flink.ml.linalg.typeinfo.DenseVectorTypeInfo; +import org.apache.flink.ml.param.IntParam; +import org.apache.flink.ml.param.Param; +import org.apache.flink.ml.param.ParamValidators; +import org.apache.flink.ml.util.ParamUtils; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.types.Row; +import org.apache.flink.util.NumberSequenceIterator; + +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +/** A DataGenerator which creates a table of features, label and weight. */ +public class LabeledPointWithWeightGenerator +implements InputDataGenerator, +HasFeaturesCol, +HasLabelCol, +HasWeightCol, +HasVectorDim { + +public static final Param FEATURE_ARITY = +new IntParam( +"featureArity", +"Arity of each feature. " ++ "If set to positive value, each feature would be an integer in range [0, arity - 1]. " ++ "If set to zero, each feature would be a continuous double in range [0, 1).", +2, +ParamValidators.gtEq(0)); + +public static final Param LABEL_ARITY = +new IntParam( +"labelArity", +"Arity of label. " ++ "If set to positive value, the label would be an integer in range [0, arity - 1]. " ++ "If set to zero, the label would be a continuous double in range [0, 1).", +2, +ParamValidators.gtEq(0)); + +public static final Param WEIGHT_ARITY = Review Comment: I agree. I'll make the change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-26914) Notice files improvements in flink-kubernetes-operator
[ https://issues.apache.org/jira/browse/FLINK-26914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yang Wang reassigned FLINK-26914: - Assignee: Yang Wang > Notice files improvements in flink-kubernetes-operator > -- > > Key: FLINK-26914 > URL: https://issues.apache.org/jira/browse/FLINK-26914 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Reporter: Yang Wang >Assignee: Yang Wang >Priority: Major > Labels: starter > Fix For: kubernetes-operator-1.0.0 > > > See the discussion in this PR[1]. > Having the jars contain the NOTICE files is insufficient as discoverability > within the image is basically 0. We should extract & merge the NOTICE files > for all jars, similarly to what we do for {{{}flink-dist{}}}. > > Another small improvement is flink dependencies could be removed from > {{{}flink-kubernetes-operator/resources/META-INF/NOTICE{}}}. > > [1]. https://github.com/apache/flink-kubernetes-operator/pull/127 -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] snuyanzin commented on pull request #19754: [FLINK-27673][tests] Migrate flink-table-api-scala to JUnit5
snuyanzin commented on PR #19754: URL: https://github.com/apache/flink/pull/19754#issuecomment-1129451355 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] snuyanzin commented on pull request #19753: [FLINK-27672][tests][table] Migrate flink-table-common to JUnit5
snuyanzin commented on PR #19753: URL: https://github.com/apache/flink/pull/19753#issuecomment-1129436869 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] xinbinhuang commented on pull request #17873: [FLINK-25009][CLI] Output slotSharingGroup as part of JsonGraph
xinbinhuang commented on PR #17873: URL: https://github.com/apache/flink/pull/17873#issuecomment-1129364011 @KarmaGYZ sorry for getting back to this so late. Finally got some time sort out my laptop's build setup. Can you take a look at the PR again? thx -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] xinbinhuang commented on pull request #17873: [FLINK-25009][CLI] Output slotSharingGroup as part of JsonGraph
xinbinhuang commented on PR #17873: URL: https://github.com/apache/flink/pull/17873#issuecomment-1129363293 @flinkbot run azure looks like the CI is stuck? (`Build Flink` stage last for > 4 hrs `e2c_ci_test2`.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #19716: [FLINK-27607][tests] Migrate module flink-connector-files to JUnit5
snuyanzin commented on code in PR #19716: URL: https://github.com/apache/flink/pull/19716#discussion_r875296706 ## flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkCompactionSwitchITCase.java: ## @@ -108,55 +104,56 @@ public class FileSinkCompactionSwitchITCase extends TestLogger { protected static final int NUM_BUCKETS = 4; -@ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); +@TempDir private static java.nio.file.Path tmpDir; -@Rule public final SharedObjects sharedObjects = SharedObjects.create(); +@RegisterExtension final SharedObjectsExtension sharedObjects = SharedObjectsExtension.create(); Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #19716: [FLINK-27607][tests] Migrate module flink-connector-files to JUnit5
snuyanzin commented on code in PR #19716: URL: https://github.com/apache/flink/pull/19716#discussion_r875295088 ## flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/PartitionWriterTest.java: ## @@ -23,26 +23,39 @@ import org.apache.flink.connector.file.table.PartitionWriter.Context; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; -import org.apache.flink.table.utils.LegacyRowResource; import org.apache.flink.types.Row; +import org.apache.flink.types.RowUtils; -import org.junit.Assert; -import org.junit.ClassRule; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import java.io.IOException; import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import static org.assertj.core.api.Assertions.assertThat; + /** Test for {@link PartitionWriter}s. */ -public class PartitionWriterTest { +class PartitionWriterTest { + +@TempDir private java.nio.file.Path tmpDir; -@Rule public final LegacyRowResource usesLegacyRows = LegacyRowResource.INSTANCE; Review Comment: ok, thanks, I've made these methods public and started to call them -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #19716: [FLINK-27607][tests] Migrate module flink-connector-files to JUnit5
snuyanzin commented on code in PR #19716: URL: https://github.com/apache/flink/pull/19716#discussion_r875294629 ## flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java: ## @@ -147,39 +145,40 @@ public void testStaticPartition() throws Exception { testHarness.processElement(new StreamRecord<>(Row.of("a2", 2), 1L)); testHarness.processElement(new StreamRecord<>(Row.of("a2", 2), 1L)); testHarness.processElement(new StreamRecord<>(Row.of("a3", 3), 1L)); -assertEquals(1, getFileContentByPath(tmpFile).size()); +assertThat(getFileContentByPath(tmpPath)).hasSize(1); } ref.get().finalizeGlobal(1); -Map content = getFileContentByPath(outputFile); -assertEquals(1, content.size()); -assertEquals("c=p1", content.keySet().iterator().next().getParentFile().getName()); -assertEquals("a1,1\n" + "a2,2\n" + "a2,2\n" + "a3,3\n", content.values().iterator().next()); -assertFalse(new File(tmpFile.toURI()).exists()); +Map content = getFileContentByPath(outputPath); +assertThat(content).hasSize(1); + assertThat(content.keySet().iterator().next().getParentFile().getName()).isEqualTo("c=p1"); +assertThat(content.values().iterator().next()) +.isEqualTo("a1,1\n" + "a2,2\n" + "a2,2\n" + "a3,3\n"); +assertThat(new File(tmpPath.toUri())).doesNotExist(); } @Test -public void testDynamicPartition() throws Exception { +void testDynamicPartition() throws Exception { AtomicReference> ref = new AtomicReference<>(); try (OneInputStreamOperatorTestHarness testHarness = createSink(false, true, false, new LinkedHashMap<>(), ref)) { writeUnorderedRecords(testHarness); -assertEquals(2, getFileContentByPath(tmpFile).size()); +assertThat(getFileContentByPath(tmpPath)).hasSize(2); } ref.get().finalizeGlobal(1); -Map content = getFileContentByPath(outputFile); +Map content = getFileContentByPath(outputPath); Map sortedContent = new TreeMap<>(); content.forEach((file, s) -> sortedContent.put(file.getParentFile().getName(), s)); -assertEquals(2, sortedContent.size()); -assertEquals("a1,1\n" + "a2,2\n" + "a3,3\n", sortedContent.get("c=p1")); -assertEquals("a2,2\n", sortedContent.get("c=p2")); -assertFalse(new File(tmpFile.toURI()).exists()); +assertThat(sortedContent).hasSize(2); +assertThat(sortedContent.get("c=p1")).isEqualTo("a1,1\n" + "a2,2\n" + "a3,3\n"); +assertThat(sortedContent.get("c=p2")).isEqualTo("a2,2\n"); Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #19754: [FLINK-27673][tests] Migrate flink-table-api-scala to JUnit5
flinkbot commented on PR #19754: URL: https://github.com/apache/flink/pull/19754#issuecomment-1129356861 ## CI report: * 3c204d8b0a0493bfe50633bdb22b68ba76fe6c84 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #19716: [FLINK-27607][tests] Migrate module flink-connector-files to JUnit5
snuyanzin commented on code in PR #19716: URL: https://github.com/apache/flink/pull/19716#discussion_r875293489 ## flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkITBase.java: ## @@ -47,20 +44,17 @@ public abstract class FileSinkITBase extends TestLogger { protected static final double FAILOVER_RATIO = 0.4; -@ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); - -@Parameterized.Parameter public boolean triggerFailover; - -@Parameterized.Parameters(name = "triggerFailover = {0}") -public static Collection params() { -return Arrays.asList(new Object[] {false}, new Object[] {true}); +public static Stream params() { Review Comment: yep, done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #19716: [FLINK-27607][tests] Migrate module flink-connector-files to JUnit5
snuyanzin commented on code in PR #19716: URL: https://github.com/apache/flink/pull/19716#discussion_r875293274 ## flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileSinkMigrationITCase.java: ## @@ -58,17 +56,15 @@ import java.util.stream.LongStream; import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning; -import static org.junit.Assert.assertEquals; +import static org.assertj.core.api.Assertions.assertThat; /** * Tests migrating from {@link StreamingFileSink} to {@link FileSink}. It trigger a savepoint for * the {@link StreamingFileSink} job and restore the {@link FileSink} job from the savepoint taken. */ -public class FileSinkMigrationITCase extends TestLogger { +class FileSinkMigrationITCase { -@ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); - -@Rule public final SharedObjects sharedObjects = SharedObjects.create(); +@RegisterExtension final SharedObjectsExtension sharedObjects = SharedObjectsExtension.create(); Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-27673) [JUnit5 Migration] Module: flink-table-api-scala
[ https://issues.apache.org/jira/browse/FLINK-27673?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-27673: --- Labels: pull-request-available (was: ) > [JUnit5 Migration] Module: flink-table-api-scala > > > Key: FLINK-27673 > URL: https://issues.apache.org/jira/browse/FLINK-27673 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API, Tests >Reporter: Sergey Nuyanzin >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] snuyanzin opened a new pull request, #19754: [FLINK-27673][tests] Migrate flink-table-api-scala to JUnit5
snuyanzin opened a new pull request, #19754: URL: https://github.com/apache/flink/pull/19754 ## What is the purpose of the change Update the flink-table-api-scala module to AssertJ and JUnit 5 following the [JUnit 5 Migration Guide](https://docs.google.com/document/d/1514Wa_aNB9bJUen4xm5uiuXOooOJTtXqS_Jqk9KJitU/edit) ## Brief change log Use JUnit5 and AssertJ in tests instead of JUnit4 and Hamcrest ## Verifying this change This change is a code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): ( no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: ( no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: ( no) ## Documentation - Does this pull request introduce a new feature? ( no) - If yes, how is the feature documented? (not applicable) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a diff in pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode
rkhachatryan commented on code in PR #19448: URL: https://github.com/apache/flink/pull/19448#discussion_r875289701 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java: ## @@ -227,6 +229,14 @@ private static void registerSharedState( for (KeyedStateHandle stateHandle : stateHandles) { if (stateHandle != null) { stateHandle.registerSharedStates(sharedStateRegistry, checkpointID); +// Registering state handle to the given sharedStateRegistry serves two purposes: +// 1. let sharedStateRegistry be responsible for cleaning the state handle, Review Comment: The ID is formally necessary to implement `equals` (`hashCode`). Otherwise, each time this ID is used, `SharedStateRegistry` will try to discard it. Discard is a no-op, but currently deletions are scheduled one-by-one to the IO thread pool. So it might delay other IO tasks and/or lengthen the queue inside the pool. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-27669) [JUnit5 Migration] Module: flink-file-sink-common
[ https://issues.apache.org/jira/browse/FLINK-27669?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Nuyanzin updated FLINK-27669: Component/s: Tests > [JUnit5 Migration] Module: flink-file-sink-common > - > > Key: FLINK-27669 > URL: https://issues.apache.org/jira/browse/FLINK-27669 > Project: Flink > Issue Type: Sub-task > Components: Tests >Reporter: Sergey Nuyanzin >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (FLINK-27672) [JUnit5 Migration] Module: flink-table-common
[ https://issues.apache.org/jira/browse/FLINK-27672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Nuyanzin updated FLINK-27672: Component/s: Table SQL / API Tests > [JUnit5 Migration] Module: flink-table-common > - > > Key: FLINK-27672 > URL: https://issues.apache.org/jira/browse/FLINK-27672 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API, Tests >Reporter: Sergey Nuyanzin >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27673) [JUnit5 Migration] Module: flink-table-api-scala
Sergey Nuyanzin created FLINK-27673: --- Summary: [JUnit5 Migration] Module: flink-table-api-scala Key: FLINK-27673 URL: https://issues.apache.org/jira/browse/FLINK-27673 Project: Flink Issue Type: Sub-task Components: Table SQL / API, Tests Reporter: Sergey Nuyanzin -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] flinkbot commented on pull request #19753: [FLINK-27672][tests][table] Migrate flink-table-common to JUnit5
flinkbot commented on PR #19753: URL: https://github.com/apache/flink/pull/19753#issuecomment-1129347186 ## CI report: * af3374e392677cc6ab2161dbd25185e0215999d5 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #19753: [FLINK-27672][tests][table] Migrate flink-table-common to JUnit5
snuyanzin commented on code in PR #19753: URL: https://github.com/apache/flink/pull/19753#discussion_r875285230 ## flink-table/flink-table-common/src/test/java/org/apache/flink/table/descriptors/DescriptorTestBase.java: ## @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.descriptors; - -import org.apache.flink.util.Preconditions; - -import org.junit.Test; - -import java.util.Collections; -import java.util.List; -import java.util.Map; - -import static org.assertj.core.api.Assertions.assertThat; - -/** Test base for testing {@link Descriptor} together with {@link DescriptorValidator}. */ -public abstract class DescriptorTestBase { Review Comment: Removed as it is abstract, no successors, no usage -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-27672) [JUnit5 Migration] Module: flink-table-common
[ https://issues.apache.org/jira/browse/FLINK-27672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-27672: --- Labels: pull-request-available (was: ) > [JUnit5 Migration] Module: flink-table-common > - > > Key: FLINK-27672 > URL: https://issues.apache.org/jira/browse/FLINK-27672 > Project: Flink > Issue Type: Sub-task >Reporter: Sergey Nuyanzin >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] snuyanzin opened a new pull request, #19753: [FLINK-27672][tests][table] Migrate flink-table-common to JUnit5
snuyanzin opened a new pull request, #19753: URL: https://github.com/apache/flink/pull/19753 ## What is the purpose of the change Update the flink-table-common module to AssertJ and JUnit 5 following the [JUnit 5 Migration Guide](https://docs.google.com/document/d/1514Wa_aNB9bJUen4xm5uiuXOooOJTtXqS_Jqk9KJitU/edit) ## Brief change log Use JUnit5 and AssertJ in tests instead of JUnit4 and Hamcrest ## Verifying this change This change is a code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): ( no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: ( no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: ( no) ## Documentation - Does this pull request introduce a new feature? ( no) - If yes, how is the feature documented? (not applicable) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-27672) [JUnit5 Migration] Module: flink-table-common
Sergey Nuyanzin created FLINK-27672: --- Summary: [JUnit5 Migration] Module: flink-table-common Key: FLINK-27672 URL: https://issues.apache.org/jira/browse/FLINK-27672 Project: Flink Issue Type: Sub-task Reporter: Sergey Nuyanzin -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-27650) First environment variable of top level pod template is lost
[ https://issues.apache.org/jira/browse/FLINK-27650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538399#comment-17538399 ] Simon Paradis commented on FLINK-27650: --- Okay then we can defined all variable topmost and override as needed. > First environment variable of top level pod template is lost > > > Key: FLINK-27650 > URL: https://issues.apache.org/jira/browse/FLINK-27650 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-0.1.0 >Reporter: Simon Paradis >Priority: Major > Fix For: kubernetes-operator-1.0.0 > > Attachments: flink-27650.yaml > > > I am using the Flink operator image *apache/flink-kubernetes-operator:0.1.0* > to deploy Flink 1.14.4 job. The deployment manifest makes use of pod template > feature to inject environment variable to control structured JSON logging. > I noticed the first defined environment variable is never injected into the > JobManager nor TaskManager pods. The work around is to define a dummy env. > var. > Here's the manifest template. This gets processed by a tool that will first > expand ${ENV_VAR} reference with values provided by our CI pipeline. We > should not have to create the FLINK_COORDINATES_DUMMY env var. > > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink-web] MartijnVisser merged pull request #539: add low latency techniques blog post part1
MartijnVisser merged PR #539: URL: https://github.com/apache/flink-web/pull/539 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] MartijnVisser merged pull request #19748: [hotfix][python] Fix lint-python typo
MartijnVisser merged PR #19748: URL: https://github.com/apache/flink/pull/19748 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-27665) Optimize event triggering on DeploymentFailedExceptions
[ https://issues.apache.org/jira/browse/FLINK-27665?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-27665: --- Labels: pull-request-available (was: ) > Optimize event triggering on DeploymentFailedExceptions > --- > > Key: FLINK-27665 > URL: https://issues.apache.org/jira/browse/FLINK-27665 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-0.1.0 >Reporter: Matyas Orhidi >Assignee: Matyas Orhidi >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.0.0 > > Attachments: image-2022-05-17-12-08-42-597.png, > image-2022-05-17-12-13-19-489.png > > > Use `EventUtils` when handling `DeploymentFailedExceptions` to avoid > appending new events on every reconcile loop: > !image-2022-05-17-12-13-19-489.png|width=1365,height=575! > > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink-kubernetes-operator] morhidi commented on pull request #220: [FLINK-27665] Optimize event triggering on DeploymentFailedExceptions
morhidi commented on PR #220: URL: https://github.com/apache/flink-kubernetes-operator/pull/220#issuecomment-1129103487 cc @wangyang0918 @Aitozi -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] morhidi opened a new pull request, #220: [FLINK-27665] Optimize event triggering on DeploymentFailedExceptions
morhidi opened a new pull request, #220: URL: https://github.com/apache/flink-kubernetes-operator/pull/220 - Separated the event related logic from the DeploymentFailedExceptions - Introduced a new component type for JobManagerDeployment, reporting DeploymentFailedExceptions - Reduced the event types to Warning and Normal only see [API docs](https://github.com/kubernetes/api/blob/master/events/v1/types.go#L84) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] snuyanzin commented on pull request #19750: [FLINK-27669][tests] Migrate flink-file-sink-common to JUnit5
snuyanzin commented on PR #19750: URL: https://github.com/apache/flink/pull/19750#issuecomment-1129088650 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-27671) Publish SNAPSHOT docker images
[ https://issues.apache.org/jira/browse/FLINK-27671?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538319#comment-17538319 ] Márton Balassi commented on FLINK-27671: Fyi: [https://github.com/apache/flink-kubernetes-operator/blob/main/.github/workflows/docker_push.yml] Please add support to arm architecture too as the above GHA implementation does. > Publish SNAPSHOT docker images > -- > > Key: FLINK-27671 > URL: https://issues.apache.org/jira/browse/FLINK-27671 > Project: Flink > Issue Type: Improvement >Reporter: Alexander Fedulov >Assignee: Alexander Fedulov >Priority: Major > > A discussion on the Flink-Dev mailing list [1] concluded that it is desirable > to add the ability to publish SNAPSHOT Docker images of Apache Flink to GHCR. > [1] [https://lists.apache.org/thread/0h60qz8vrw980n1vscz3pxcsv3c3h24m] -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] flinkbot commented on pull request #19752: [FLINK-27340][tests] Migrate module flink-python to JUnit5
flinkbot commented on PR #19752: URL: https://github.com/apache/flink/pull/19752#issuecomment-1129072997 ## CI report: * c3f34db24204a4d4ae5544688139b3de3f143fc1 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-27340) [JUnit5 Migration] Module: flink-python
[ https://issues.apache.org/jira/browse/FLINK-27340?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-27340: --- Labels: pull-request-available (was: ) > [JUnit5 Migration] Module: flink-python > --- > > Key: FLINK-27340 > URL: https://issues.apache.org/jira/browse/FLINK-27340 > Project: Flink > Issue Type: Sub-task > Components: API / Python, Tests >Reporter: Sergey Nuyanzin >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] snuyanzin opened a new pull request, #19752: [FLINK-27340][tests] Migrate module flink-python to JUnit5
snuyanzin opened a new pull request, #19752: URL: https://github.com/apache/flink/pull/19752 ## What is the purpose of the change Update the flink-python module to AssertJ and JUnit 5 following the [JUnit 5 Migration Guide](https://docs.google.com/document/d/1514Wa_aNB9bJUen4xm5uiuXOooOJTtXqS_Jqk9KJitU/edit) ## Brief change log Use JUnit5 and AssertJ in tests instead of JUnit4 and Hamcrest ## Verifying this change This change is a code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): ( no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: ( no) - The serializers: ( no) - The runtime per-record code paths (performance sensitive): ( no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: ( no) - The S3 file system connector: ( no) ## Documentation - Does this pull request introduce a new feature? ( no) - If yes, how is the feature documented? (not applicable) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-docker] afedulov commented on pull request #116: Publish snapshot images
afedulov commented on PR #116: URL: https://github.com/apache/flink-docker/pull/116#issuecomment-1129067361 @zentol I cleaned up the PR and resolved conflicts with the master, PTAL. The main question for me is where this change should be merged. IIUC the issue is that the add-custom.sh script is not available on master, while scheduled runs can only be executed there. Would you be OK with adding add-custom.sh to master in that case? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] MartijnVisser merged pull request #19749: [FLINK-27167][test] change the junit-jupiter dependency scope to compile
MartijnVisser merged PR #19749: URL: https://github.com/apache/flink/pull/19749 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-27671) Publish SNAPSHOT docker images
Alexander Fedulov created FLINK-27671: - Summary: Publish SNAPSHOT docker images Key: FLINK-27671 URL: https://issues.apache.org/jira/browse/FLINK-27671 Project: Flink Issue Type: Improvement Reporter: Alexander Fedulov Assignee: Alexander Fedulov A discussion on the Flink-Dev mailing list [1] concluded that it is desirable to add the ability to publish SNAPSHOT Docker images of Apache Flink to GHCR. [1] [https://lists.apache.org/thread/0h60qz8vrw980n1vscz3pxcsv3c3h24m] -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #187: [FLINK-27443] Create standalone mode parameters and decorators for JM and TMs
gyfora commented on code in PR #187: URL: https://github.com/apache/flink-kubernetes-operator/pull/187#discussion_r874958360 ## flink-kubernetes-standalone-cluster/pom.xml: ## @@ -0,0 +1,87 @@ + + +http://maven.apache.org/POM/4.0.0; + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> +4.0.0 + + + +flink-kubernetes-operator-parent +org.apache.flink +1.0-SNAPSHOT +.. + + + + +flink-kubernetes-standalone-cluster Review Comment: Maybe a more suitable name would be simply `flink-kubernetes-standalone` ## flink-kubernetes-standalone-cluster/pom.xml: ## @@ -0,0 +1,87 @@ + + +http://maven.apache.org/POM/4.0.0; + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> +4.0.0 + + + +flink-kubernetes-operator-parent +org.apache.flink +1.0-SNAPSHOT +.. + + + + +flink-kubernetes-standalone-cluster +Flink Kubernetes Standalone Kube Client Review Comment: I think we can simply name this `Flink Kubernetes Standalone` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] echauchot commented on a diff in pull request #19680: [FLINK-27457] Implement flush() logic in Cassandra output formats
echauchot commented on code in PR #19680: URL: https://github.com/apache/flink/pull/19680#discussion_r874964198 ## flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormatBase.java: ## @@ -17,130 +17,191 @@ package org.apache.flink.batch.connectors.cassandra; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.io.RichOutputFormat; import org.apache.flink.configuration.Configuration; +import org.apache.flink.connectors.cassandra.utils.SinkUtils; import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; import org.apache.flink.util.Preconditions; import com.datastax.driver.core.Cluster; -import com.datastax.driver.core.PreparedStatement; -import com.datastax.driver.core.ResultSet; -import com.datastax.driver.core.ResultSetFuture; import com.datastax.driver.core.Session; -import com.google.common.base.Strings; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.time.Duration; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; /** - * CassandraOutputFormatBase is the common abstract class for writing into Apache Cassandra. + * CassandraOutputFormatBase is the common abstract class for writing into Apache Cassandra using + * output formats. * * @param Type of the elements to write. */ -public abstract class CassandraOutputFormatBase extends RichOutputFormat { +public abstract class CassandraOutputFormatBase extends RichOutputFormat { private static final Logger LOG = LoggerFactory.getLogger(CassandraOutputFormatBase.class); -private final String insertQuery; private final ClusterBuilder builder; +private Semaphore semaphore; +private Duration maxConcurrentRequestsTimeout = Duration.ofMillis(Long.MAX_VALUE); +private int maxConcurrentRequests = Integer.MAX_VALUE; private transient Cluster cluster; -private transient Session session; -private transient PreparedStatement prepared; -private transient FutureCallback callback; -private transient Throwable exception = null; +protected transient Session session; +private transient FutureCallback callback; +private AtomicReference throwable; -public CassandraOutputFormatBase(String insertQuery, ClusterBuilder builder) { -Preconditions.checkArgument( -!Strings.isNullOrEmpty(insertQuery), "Query cannot be null or empty"); +public CassandraOutputFormatBase(ClusterBuilder builder) { Preconditions.checkNotNull(builder, "Builder cannot be null"); - -this.insertQuery = insertQuery; this.builder = builder; } +/** + * Sets the maximum allowed number of concurrent requests for this output format. + * + * @param maxConcurrentRequestsTimeout timeout duration when acquiring a permit to execute + */ +public void setMaxConcurrentRequestsTimeout(Duration maxConcurrentRequestsTimeout) { Review Comment: :+1: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-docker] afedulov commented on a diff in pull request #116: Publish snapshot images
afedulov commented on code in PR #116: URL: https://github.com/apache/flink-docker/pull/116#discussion_r866937358 ## .github/workflows/snapshot.yml: ## @@ -0,0 +1,68 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: "Publish SNAPSHOTs" + +on: + # schedule: + # - cron: '0 0 * * *' # Deploy every day + push: +branches: + - snapshot-dev-master + +env: + REGISTRY: ghcr.io + VERSION: 1.16-SNAPSHOT + +jobs: + ci: +uses: ./.github/workflows/ci.yml + snapshot: +needs: ci +runs-on: ubuntu-latest +permissions: + contents: read + packages: write +strategy: + matrix: +config: + - java_version: 8 + - java_version: 11 +steps: + - name: Set env +run: | + repo=${{ github.repository }}# apache/flink-docker + echo "IMAGE_REPO=$(echo ${repo##*/})" >> $GITHUB_ENV # apache + echo "OWNER=$(echo ${repo%%/*})" >> $GITHUB_ENV # flink-docker + - uses: actions/checkout@v3 + - name: Variables +run: | + echo "OWNER: $OWNER" + echo "IMAGE_REPO: $IMAGE_REPO" + - name: Prepare Dockerfiles +run: | + ./add-custom.sh -u "https://s3.amazonaws.com/flink-nightly/flink-${VERSION}-bin-scala_2.12.tgz; -j ${{ matrix.config.java_version }} -n ${VERSION}-scala_2.12-java${{ matrix.config.java_version }} Review Comment: I thought that we were going to put the snapshot.yaml to the dev-master branch. If I get you correctly, this won't work with the scheduled runs, because they can only be triggered from master. Is my understanding correct? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] fredia commented on a diff in pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode
fredia commented on code in PR #19448: URL: https://github.com/apache/flink/pull/19448#discussion_r874963782 ## flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java: ## @@ -47,23 +50,33 @@ public class SharedStateRegistryImpl implements SharedStateRegistry { /** All registered state objects by an artificial key */ private final Map registeredStates; +/** All registered checkpoints */ +private final Set registeredCheckpoints; Review Comment: I replace `Set` with `Map` temporarily. > Do you think it could/should be refactored so that CompletedCheckpointStore "transfers the ownership" of subsumed checkpoints to SharedStateRegistry? > E.g. by passing them to unregisterUnusedState. I think it would work well, I would try to pass `DefaultCompletedCheckpointStore.completedCheckpoints` to unregisterUnusedState later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] fredia commented on a diff in pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode
fredia commented on code in PR #19448: URL: https://github.com/apache/flink/pull/19448#discussion_r874957754 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java: ## @@ -227,6 +229,14 @@ private static void registerSharedState( for (KeyedStateHandle stateHandle : stateHandles) { if (stateHandle != null) { stateHandle.registerSharedStates(sharedStateRegistry, checkpointID); +// Registering state handle to the given sharedStateRegistry serves two purposes: +// 1. let sharedStateRegistry be responsible for cleaning the state handle, +// 2. update the status of the checkpoint in sharedStateRegistry to which the state +// handle belongs +sharedStateRegistry.registerReference( +new SharedStateRegistryKey(stateHandle.getStateHandleId().getKeyString()), +new StreamStateHandleWrapper(stateHandle), Review Comment: > can we get rid of the same call in ChangelogStateBackendHandle? We can't, the key of changelog handle and the key of the KeyedStateHandle in changelog handle is not same. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] fredia commented on a diff in pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode
fredia commented on code in PR #19448: URL: https://github.com/apache/flink/pull/19448#discussion_r874950094 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java: ## @@ -137,23 +134,19 @@ public CompletedCheckpoint addCheckpointAndSubsumeOldestOne( completedCheckpoints.addLast(checkpoint); +// Register checkpoint to SharedStateRegistry, and checkpoint would be discarded during +// unregistering. +getSharedStateRegistry().registerCompletedCheckpoint(checkpoint); + +// Remove completed checkpoint from queue and checkpointStateHandleStore, not discard. Optional subsume = CheckpointSubsumeHelper.subsume( completedCheckpoints, maxNumberOfCheckpointsToRetain, -completedCheckpoint -> -tryRemoveCompletedCheckpoint( -completedCheckpoint, - completedCheckpoint.shouldBeDiscardedOnSubsume(), -checkpointsCleaner, -postCleanup)); +completedCheckpoint -> tryRemove(completedCheckpoint.getCheckpointID())); Review Comment: Yes, it is a duplicate call to cleanup checkpoint. I pass an empty subsume action to `unregisterUnusedState` to avoid duplicate cleanup. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] fredia commented on a diff in pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode
fredia commented on code in PR #19448: URL: https://github.com/apache/flink/pull/19448#discussion_r874946254 ## flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java: ## @@ -137,12 +150,26 @@ public StreamStateHandle registerReference( return entry.stateHandle; } +@Override +public void registerCompletedCheckpoint(CompletedCheckpoint checkpoint) { +synchronized (registeredCheckpoints) { +LOG.trace("Register checkpoint {}.", checkpoint.getCheckpointID()); +registeredCheckpoints.add(checkpoint); +} +} + +@Override +public void setPostCleanAction(Runnable postCleanAction) { +this.postCleanAction = postCleanAction; Review Comment: Nice suggestion. I pass the `subsumAction` as an argument to unregisterUnusedState, and subsume action contains `postCleanAction`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-27670) Python wrappers for Kinesis Sinks
[ https://issues.apache.org/jira/browse/FLINK-27670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538256#comment-17538256 ] Zichen Liu commented on FLINK-27670: This task is the Sink portion of the parent task FLINK-21966 > Python wrappers for Kinesis Sinks > - > > Key: FLINK-27670 > URL: https://issues.apache.org/jira/browse/FLINK-27670 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kinesis >Reporter: Zichen Liu >Priority: Major > Fix For: 1.15.1 > > > Create Python Wrappers for the new Kinesis Streams sink and the Kinesis > Firehose sink. > An example implementation may be found here > [https://github.com/apache/flink/pull/15491/files] for the old Kinesis sink. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] afedulov commented on pull request #19660: [FLINK-27185][connectors] Convert connector modules to assertj
afedulov commented on PR #19660: URL: https://github.com/apache/flink/pull/19660#issuecomment-1128983193 connector-hive module looks good apart from the potential here https://github.com/apache/flink/pull/19660#discussion_r874931314. Can be merged when addressed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] fredia commented on a diff in pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode
fredia commented on code in PR #19448: URL: https://github.com/apache/flink/pull/19448#discussion_r874932077 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java: ## @@ -227,6 +229,14 @@ private static void registerSharedState( for (KeyedStateHandle stateHandle : stateHandles) { if (stateHandle != null) { stateHandle.registerSharedStates(sharedStateRegistry, checkpointID); +// Registering state handle to the given sharedStateRegistry serves two purposes: +// 1. let sharedStateRegistry be responsible for cleaning the state handle, Review Comment: Nice suggestion! I think that ID is also unnecessary, because SharedStateRegistryKey contains state handle id. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] afedulov commented on a diff in pull request #19660: [FLINK-27185][connectors] Convert connector modules to assertj
afedulov commented on code in PR #19660: URL: https://github.com/apache/flink/pull/19660#discussion_r874931314 ## flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java: ## @@ -437,12 +431,12 @@ public void testCustomPartitionCommitPolicyNotFound() { testStreamingWriteWithCustomPartitionCommitPolicy(customCommitPolicyClassName); fail("ExecutionException expected"); } catch (Exception e) { -assertTrue( -ExceptionUtils.findThrowableWithMessage( +assertThat( Review Comment: We could use `hasStackTraceContaining` in the following check to avoid the complex `satisfies` logic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-24433) "No space left on device" in Azure e2e tests
[ https://issues.apache.org/jira/browse/FLINK-24433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser updated FLINK-24433: --- Fix Version/s: (was: 1.14.5) (was: 1.15.1) > "No space left on device" in Azure e2e tests > > > Key: FLINK-24433 > URL: https://issues.apache.org/jira/browse/FLINK-24433 > Project: Flink > Issue Type: Bug > Components: Build System / Azure Pipelines >Affects Versions: 1.15.0 >Reporter: Dawid Wysakowicz >Assignee: Martijn Visser >Priority: Blocker > Labels: auto-deprioritized-critical, pull-request-available, > test-stability > Fix For: 1.16.0 > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=24668=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=070ff179-953e-5bda-71fa-d6599415701c=19772 > {code} > Sep 30 17:08:42 Job has been submitted with JobID > 5594c18e128a328ede39cfa59cb3cb07 > Sep 30 17:08:56 2021-09-30 17:08:56,809 main ERROR Recovering from > StringBuilderEncoder.encode('2021-09-30 17:08:56,807 WARN > org.apache.flink.streaming.api.operators.collect.CollectResultFetcher [] - An > exception occurred when fetching query results > Sep 30 17:08:56 java.util.concurrent.ExecutionException: > org.apache.flink.runtime.rest.util.RestClientException: [Internal server > error., Sep 30 17:08:56 org.apache.flink.runtime.messages.FlinkJobNotFoundException: > Could not find Flink job (5594c18e128a328ede39cfa59cb3cb07) > Sep 30 17:08:56 at > org.apache.flink.runtime.dispatcher.Dispatcher.getJobMasterGateway(Dispatcher.java:923) > Sep 30 17:08:56 at > org.apache.flink.runtime.dispatcher.Dispatcher.performOperationOnJobMasterGateway(Dispatcher.java:937) > Sep 30 17:08:56 at > org.apache.flink.runtime.dispatcher.Dispatcher.deliverCoordinationRequestToCoordina2021-09-30T17:08:57.1584224Z > ##[error]No space left on device > {code} -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Closed] (FLINK-24433) "No space left on device" in Azure e2e tests
[ https://issues.apache.org/jira/browse/FLINK-24433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser closed FLINK-24433. -- Resolution: Fixed > "No space left on device" in Azure e2e tests > > > Key: FLINK-24433 > URL: https://issues.apache.org/jira/browse/FLINK-24433 > Project: Flink > Issue Type: Bug > Components: Build System / Azure Pipelines >Affects Versions: 1.15.0, 1.16.0 >Reporter: Dawid Wysakowicz >Assignee: Martijn Visser >Priority: Blocker > Labels: auto-deprioritized-critical, pull-request-available, > test-stability > Fix For: 1.16.0 > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=24668=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=070ff179-953e-5bda-71fa-d6599415701c=19772 > {code} > Sep 30 17:08:42 Job has been submitted with JobID > 5594c18e128a328ede39cfa59cb3cb07 > Sep 30 17:08:56 2021-09-30 17:08:56,809 main ERROR Recovering from > StringBuilderEncoder.encode('2021-09-30 17:08:56,807 WARN > org.apache.flink.streaming.api.operators.collect.CollectResultFetcher [] - An > exception occurred when fetching query results > Sep 30 17:08:56 java.util.concurrent.ExecutionException: > org.apache.flink.runtime.rest.util.RestClientException: [Internal server > error., Sep 30 17:08:56 org.apache.flink.runtime.messages.FlinkJobNotFoundException: > Could not find Flink job (5594c18e128a328ede39cfa59cb3cb07) > Sep 30 17:08:56 at > org.apache.flink.runtime.dispatcher.Dispatcher.getJobMasterGateway(Dispatcher.java:923) > Sep 30 17:08:56 at > org.apache.flink.runtime.dispatcher.Dispatcher.performOperationOnJobMasterGateway(Dispatcher.java:937) > Sep 30 17:08:56 at > org.apache.flink.runtime.dispatcher.Dispatcher.deliverCoordinationRequestToCoordina2021-09-30T17:08:57.1584224Z > ##[error]No space left on device > {code} -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (FLINK-24433) "No space left on device" in Azure e2e tests
[ https://issues.apache.org/jira/browse/FLINK-24433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser updated FLINK-24433: --- Affects Version/s: 1.16.0 > "No space left on device" in Azure e2e tests > > > Key: FLINK-24433 > URL: https://issues.apache.org/jira/browse/FLINK-24433 > Project: Flink > Issue Type: Bug > Components: Build System / Azure Pipelines >Affects Versions: 1.15.0, 1.16.0 >Reporter: Dawid Wysakowicz >Assignee: Martijn Visser >Priority: Blocker > Labels: auto-deprioritized-critical, pull-request-available, > test-stability > Fix For: 1.16.0 > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=24668=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=070ff179-953e-5bda-71fa-d6599415701c=19772 > {code} > Sep 30 17:08:42 Job has been submitted with JobID > 5594c18e128a328ede39cfa59cb3cb07 > Sep 30 17:08:56 2021-09-30 17:08:56,809 main ERROR Recovering from > StringBuilderEncoder.encode('2021-09-30 17:08:56,807 WARN > org.apache.flink.streaming.api.operators.collect.CollectResultFetcher [] - An > exception occurred when fetching query results > Sep 30 17:08:56 java.util.concurrent.ExecutionException: > org.apache.flink.runtime.rest.util.RestClientException: [Internal server > error., Sep 30 17:08:56 org.apache.flink.runtime.messages.FlinkJobNotFoundException: > Could not find Flink job (5594c18e128a328ede39cfa59cb3cb07) > Sep 30 17:08:56 at > org.apache.flink.runtime.dispatcher.Dispatcher.getJobMasterGateway(Dispatcher.java:923) > Sep 30 17:08:56 at > org.apache.flink.runtime.dispatcher.Dispatcher.performOperationOnJobMasterGateway(Dispatcher.java:937) > Sep 30 17:08:56 at > org.apache.flink.runtime.dispatcher.Dispatcher.deliverCoordinationRequestToCoordina2021-09-30T17:08:57.1584224Z > ##[error]No space left on device > {code} -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-24433) "No space left on device" in Azure e2e tests
[ https://issues.apache.org/jira/browse/FLINK-24433?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538252#comment-17538252 ] Martijn Visser commented on FLINK-24433: After more investigating, we found out that an Elasticsearch cluster goes into read only mode by default if there's 85% or less disk space available. On the Azure E2E tests, this is quite common. We therefore disable the entire disk allocation decider, as documented on https://www.elastic.co/guide/en/elasticsearch/reference/6.2/disk-allocator.html Fixed in master: 86cf5247c564a6a17a41c0a858e838ca975c9a22 > "No space left on device" in Azure e2e tests > > > Key: FLINK-24433 > URL: https://issues.apache.org/jira/browse/FLINK-24433 > Project: Flink > Issue Type: Bug > Components: Build System / Azure Pipelines >Affects Versions: 1.15.0 >Reporter: Dawid Wysakowicz >Assignee: Martijn Visser >Priority: Blocker > Labels: auto-deprioritized-critical, pull-request-available, > test-stability > Fix For: 1.16.0, 1.14.5, 1.15.1 > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=24668=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=070ff179-953e-5bda-71fa-d6599415701c=19772 > {code} > Sep 30 17:08:42 Job has been submitted with JobID > 5594c18e128a328ede39cfa59cb3cb07 > Sep 30 17:08:56 2021-09-30 17:08:56,809 main ERROR Recovering from > StringBuilderEncoder.encode('2021-09-30 17:08:56,807 WARN > org.apache.flink.streaming.api.operators.collect.CollectResultFetcher [] - An > exception occurred when fetching query results > Sep 30 17:08:56 java.util.concurrent.ExecutionException: > org.apache.flink.runtime.rest.util.RestClientException: [Internal server > error., Sep 30 17:08:56 org.apache.flink.runtime.messages.FlinkJobNotFoundException: > Could not find Flink job (5594c18e128a328ede39cfa59cb3cb07) > Sep 30 17:08:56 at > org.apache.flink.runtime.dispatcher.Dispatcher.getJobMasterGateway(Dispatcher.java:923) > Sep 30 17:08:56 at > org.apache.flink.runtime.dispatcher.Dispatcher.performOperationOnJobMasterGateway(Dispatcher.java:937) > Sep 30 17:08:56 at > org.apache.flink.runtime.dispatcher.Dispatcher.deliverCoordinationRequestToCoordina2021-09-30T17:08:57.1584224Z > ##[error]No space left on device > {code} -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] MartijnVisser merged pull request #19751: [FLINK-24433][Connector][Elasticsearch] Disable Elasticsearch disk allocation decider
MartijnVisser merged PR #19751: URL: https://github.com/apache/flink/pull/19751 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #117: [FLINK-27543] Hide column statistics collector inside the file format writer.
JingsongLi commented on code in PR #117: URL: https://github.com/apache/flink-table-store/pull/117#discussion_r874898190 ## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/MetricFileWriter.java: ## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.table.store.file.writer; + +import org.apache.flink.api.common.serialization.BulkWriter; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.store.file.stats.FieldStats; +import org.apache.flink.table.store.file.stats.FieldStatsCollector; +import org.apache.flink.table.store.file.stats.FileStatsExtractor; +import org.apache.flink.table.store.file.utils.FileUtils; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.function.Function; + +/** + * An {@link FileWriter} to write generic record and generate {@link Metric} once closing it. + * + * @param generic record type. + */ +public class MetricFileWriter implements FileWriter { +private static final Logger LOG = LoggerFactory.getLogger(MetricFileWriter.class); + +private final BulkWriter writer; +private final Function converter; +private final FSDataOutputStream out; +private final Path path; +private final FileStatsExtractor fileStatsExtractor; + +private FieldStatsCollector fieldStatsCollector = null; + +private long recordCount; +private boolean closed = false; + +private MetricFileWriter( +BulkWriter writer, +Function converter, +FSDataOutputStream out, +Path path, +RowType writeSchema, +FileStatsExtractor fileStatsExtractor) { +this.writer = writer; +this.converter = converter; +this.out = out; +this.path = path; + +this.fileStatsExtractor = fileStatsExtractor; +if (this.fileStatsExtractor == null) { +this.fieldStatsCollector = new FieldStatsCollector(writeSchema); +} + +this.recordCount = 0L; +} + +@Override +public void write(T record) throws IOException { +RowData rowData = converter.apply(record); +writer.addElement(rowData); + +if (fieldStatsCollector != null) { +fieldStatsCollector.collect(rowData); +} + +recordCount += 1; +} + +@Override +public long recordCount() { +return recordCount; +} + +@Override +public long length() { +try { +return out.getPos(); +} catch (IOException e) { +throw new UncheckedIOException(e); +} +} + +@Override +public void abort() { +try { +close(); +} catch (IOException e) { +throw new UncheckedIOException(e); +} + +FileUtils.deleteOrWarn(path); +} + +@Override +public Metric result() throws IOException { +Preconditions.checkState(closed, "Cannot access metric unless the writer is closed."); + +FieldStats[] stats; +if (fileStatsExtractor != null) { +stats = fileStatsExtractor.extract(path); +} else { +stats = fieldStatsCollector.extractFieldStats(); +} + +return new Metric(stats, recordCount); +} + +@Override +public void close() throws IOException { +if (!closed) { +if (writer != null) { +writer.flush(); +writer.finish(); +} + +if (out != null) { +out.flush(); +out.close(); +} + +closed = true; +} +} + +public static FileWriter.Factory createFactory( +BulkWriter.Factory factory, +Function converter, +RowType writeSchema, +FileStatsExtractor
[jira] [Created] (FLINK-27670) Python wrappers for Kinesis Sinks
Zichen Liu created FLINK-27670: -- Summary: Python wrappers for Kinesis Sinks Key: FLINK-27670 URL: https://issues.apache.org/jira/browse/FLINK-27670 Project: Flink Issue Type: Improvement Components: Connectors / Kinesis Reporter: Zichen Liu Fix For: 1.15.1 Create Python Wrappers for the new Kinesis Streams sink and the Kinesis Firehose sink. An example implementation may be found here [https://github.com/apache/flink/pull/15491/files] for the old Kinesis sink. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-27640) Flink not compiling, flink-connector-hive_2.12 is missing jhyde pentaho-aggdesigner-algorithm:jar:5.1.5-jhyde
[ https://issues.apache.org/jira/browse/FLINK-27640?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538236#comment-17538236 ] Chesnay Schepler commented on FLINK-27640: -- FYI this is because you are using a newer maven version than recommended, which blocks http repositories. The conjars repo is defined in the hive poms. We could override those, but it's not really ideal because this is only required for hive 2.X, and not 3.X that we also support. > Flink not compiling, flink-connector-hive_2.12 is missing jhyde > pentaho-aggdesigner-algorithm:jar:5.1.5-jhyde > -- > > Key: FLINK-27640 > URL: https://issues.apache.org/jira/browse/FLINK-27640 > Project: Flink > Issue Type: Bug > Components: Build System, Connectors / Hive >Affects Versions: 1.16.0 >Reporter: Piotr Nowojski >Assignee: Chesnay Schepler >Priority: Critical > > When clean installing whole project after cleaning local {{.m2}} directory I > encountered the following error when compiling flink-connector-hive_2.12: > {noformat} > [ERROR] Failed to execute goal on project flink-connector-hive_2.12: Could > not resolve dependencies for project > org.apache.flink:flink-connector-hive_2.12:jar:1.16-SNAPSHOT: Failed to > collect dependencies at org.apache.hive:hive-exec:jar:2.3.9 -> > org.pentaho:pentaho-aggdesigner-algorithm:jar:5.1.5-jhyde: Failed to read > artifact descriptor for > org.pentaho:pentaho-aggdesigner-algorithm:jar:5.1.5-jhyde: Could not transfer > artifact org.pentaho:pentaho-aggdesigner-algorithm:pom:5.1.5-jhyde from/to > maven-default-http-blocker (http://0.0.0.0/): Blocked mirror for > repositories: [conjars (http://conjars.org/repo, default, > releases+snapshots), apache.snapshots > (http://repository.apache.org/snapshots, default, snapshots)] -> [Help 1] > {noformat} > I've solved this by adding > {noformat} > > spring-repo-plugins > https://repo.spring.io/ui/native/plugins-release/ > > {noformat} > to ~/.m2/settings.xml file. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] flinkbot commented on pull request #19751: [FLINK-24433][Connector][Elasticsearch] Disable Elasticsearch disk allocation decider
flinkbot commented on PR #19751: URL: https://github.com/apache/flink/pull/19751#issuecomment-1128945587 ## CI report: * cbbdef5ae62531f244db5b841b38bd781ac1adff UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] MartijnVisser commented on pull request #19751: [FLINK-24433][Connector][Elasticsearch] Disable Elasticsearch disk allocation decider
MartijnVisser commented on PR #19751: URL: https://github.com/apache/flink/pull/19751#issuecomment-1128939651 Successful pipeline run can be found in https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=35747=results -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org