[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #342: [FLINK-28846] Trigger event on validation error
gyfora commented on code in PR #342: URL: https://github.com/apache/flink-kubernetes-operator/pull/342#discussion_r940969053 ## flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java: ## @@ -864,6 +872,23 @@ public void testSuccessfulObservationShouldClearErrors() throws Exception { appCluster.getStatus().getReconciliationStatus().getLastStableSpec()); } +@Test +public void testValidationError() throws Exception { +assertTrue(testController.events().isEmpty()); +var flinkDeployment = TestUtils.buildApplicationCluster(); +flinkDeployment.getSpec().getJob().setParallelism(-1); +testController.reconcile(flinkDeployment, context); + +assertEquals(1, testController.events().size()); +assertEquals( +ResourceLifecycleState.FAILED, flinkDeployment.getStatus().getLifecycleState()); + +var event = testController.events().remove(); +assertEquals("Warning", event.getType()); Review Comment: In many cases I prefer to spell out these strings in tests. That also directly guards against making breaking changes to easy :) ## flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java: ## @@ -864,6 +872,23 @@ public void testSuccessfulObservationShouldClearErrors() throws Exception { appCluster.getStatus().getReconciliationStatus().getLastStableSpec()); } +@Test +public void testValidationError() throws Exception { +assertTrue(testController.events().isEmpty()); +var flinkDeployment = TestUtils.buildApplicationCluster(); +flinkDeployment.getSpec().getJob().setParallelism(-1); +testController.reconcile(flinkDeployment, context); + +assertEquals(1, testController.events().size()); +assertEquals( +ResourceLifecycleState.FAILED, flinkDeployment.getStatus().getLifecycleState()); + +var event = testController.events().remove(); +assertEquals("Warning", event.getType()); Review Comment: In many cases I prefer to spell out these strings in tests. That also directly guards against making breaking changes too easy :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] morhidi commented on pull request #342: [FLINK-28846] Trigger event on validation error
morhidi commented on PR #342: URL: https://github.com/apache/flink-kubernetes-operator/pull/342#issuecomment-1208987995 Thank @gyfora this was indeed missing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] morhidi commented on a diff in pull request #342: [FLINK-28846] Trigger event on validation error
morhidi commented on code in PR #342: URL: https://github.com/apache/flink-kubernetes-operator/pull/342#discussion_r940967644 ## flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java: ## @@ -864,6 +872,23 @@ public void testSuccessfulObservationShouldClearErrors() throws Exception { appCluster.getStatus().getReconciliationStatus().getLastStableSpec()); } +@Test +public void testValidationError() throws Exception { +assertTrue(testController.events().isEmpty()); +var flinkDeployment = TestUtils.buildApplicationCluster(); +flinkDeployment.getSpec().getJob().setParallelism(-1); +testController.reconcile(flinkDeployment, context); + +assertEquals(1, testController.events().size()); +assertEquals( +ResourceLifecycleState.FAILED, flinkDeployment.getStatus().getLifecycleState()); + +var event = testController.events().remove(); +assertEquals("Warning", event.getType()); Review Comment: nit: You can consider using Enum.valueOf() here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a diff in pull request #20421: [FLINK-28732][state] Deprecate ambiguous StateTTLConfig#cleanFullSnapshot API
rkhachatryan commented on code in PR #20421: URL: https://github.com/apache/flink/pull/20421#discussion_r940964862 ## docs/content/docs/dev/datastream/fault-tolerance/state.md: ## @@ -474,9 +474,10 @@ ttl_config = StateTtlConfig \ For more fine-grained control over some special cleanup in background, you can configure it separately as described below. Currently, heap state backend relies on incremental cleanup and RocksDB backend uses compaction filter for background cleanup. -# Cleanup in full snapshot +# Cleanup in full scan snapshot -Additionally, you can activate the cleanup at the moment of taking the full state snapshot which +Additionally, you can activate the cleanup at the moment of taking the full scan state snapshot (means the +canonical savepoint of all state-backends, or the checkpoint/native-savepoint of hashmap state-backend.), which Review Comment: Hmm..I was thinking that RocksDB non-incremental checkpoints also support this TTL mode, don't they? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] hlteoh37 commented on pull request #20245: [FLINK-28487][connectors] Introduce configurable RateLimitingStrategy…
hlteoh37 commented on PR #20245: URL: https://github.com/apache/flink/pull/20245#issuecomment-1208983984 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-28374) Some further improvements of blocking shuffle
[ https://issues.apache.org/jira/browse/FLINK-28374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yingjie Cao closed FLINK-28374. --- Resolution: Fixed > Some further improvements of blocking shuffle > - > > Key: FLINK-28374 > URL: https://issues.apache.org/jira/browse/FLINK-28374 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Reporter: Yingjie Cao >Priority: Major > Fix For: 1.16.0 > > > This is an umbrella issue for sort-shuffle Improvements. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-kubernetes-operator] gyfora merged pull request #340: [FLINK-28845] Do not ignore initialSavepointPath if first deploy fails completely
gyfora merged PR #340: URL: https://github.com/apache/flink-kubernetes-operator/pull/340 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-28373) Read a full buffer of data per file IO read request for sort-shuffle
[ https://issues.apache.org/jira/browse/FLINK-28373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yingjie Cao resolved FLINK-28373. - Resolution: Fixed Merged into master via d6a47d897a9a4753c800b39adb17c06e154422cc > Read a full buffer of data per file IO read request for sort-shuffle > > > Key: FLINK-28373 > URL: https://issues.apache.org/jira/browse/FLINK-28373 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: Yingjie Cao >Assignee: Yuxin Tan >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0 > > > Currently, for sort blocking shuffle, the corresponding data readers read > shuffle data in buffer granularity. Before compression, each buffer is 32K by > default, after compression the size will become smaller (may less than 10K). > For file IO, this is pretty smaller. To achieve better performance and reduce > IOPS, we can read more data per IO read request and parse buffer header and > data in memory. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] wsry merged pull request #20457: [FLINK-28373][network] Read a full buffer of data per file IO read request for sort-shuffle
wsry merged PR #20457: URL: https://github.com/apache/flink/pull/20457 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-26413) Hive dialect support "LOAD DATA INPATH"
[ https://issues.apache.org/jira/browse/FLINK-26413?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu closed FLINK-26413. --- Fix Version/s: 1.16.0 Resolution: Fixed Fixed in master: 6c91cc5999828f0a61f8a54498bc71a581a05dd8 > Hive dialect support "LOAD DATA INPATH" > > > Key: FLINK-26413 > URL: https://issues.apache.org/jira/browse/FLINK-26413 > Project: Flink > Issue Type: Sub-task >Reporter: luoyuxia >Assignee: luoyuxia >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0 > > > In Hive, it's supported to use such sql like > {code:java} > LOAD DATA INPATH > {code} > to import data to hive table. > It's also need to support it using Hive dialect in Flink. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-26413) Hive dialect support "LOAD DATA INPATH"
[ https://issues.apache.org/jira/browse/FLINK-26413?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu updated FLINK-26413: Component/s: Connectors / Hive > Hive dialect support "LOAD DATA INPATH" > > > Key: FLINK-26413 > URL: https://issues.apache.org/jira/browse/FLINK-26413 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Hive >Reporter: luoyuxia >Assignee: luoyuxia >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0 > > > In Hive, it's supported to use such sql like > {code:java} > LOAD DATA INPATH > {code} > to import data to hive table. > It's also need to support it using Hive dialect in Flink. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] wuchong merged pull request #19556: [FLINK-26413][hive] Hive dialect supports "LOAD DATA INPATH"
wuchong merged PR #19556: URL: https://github.com/apache/flink/pull/19556 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wsry commented on pull request #20457: [FLINK-28373][network] Read a full buffer of data per file IO read request for sort-shuffle
wsry commented on PR #20457: URL: https://github.com/apache/flink/pull/20457#issuecomment-1208977942 @TanYuxin-tyx Thanks for the change. LGTM. Merging. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-27338) Improve spliting file for Hive table with orc format
[ https://issues.apache.org/jira/browse/FLINK-27338?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] luoyuxia updated FLINK-27338: - Summary: Improve spliting file for Hive table with orc format (was: Improve spliting file for Hive soure) > Improve spliting file for Hive table with orc format > > > Key: FLINK-27338 > URL: https://issues.apache.org/jira/browse/FLINK-27338 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Hive >Reporter: luoyuxia >Assignee: luoyuxia >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0 > > > Currently, for hive source, it'll use the hdfs block size configured with key > dfs.block.size in hdfs-site.xml as the max split size to split the files. The > default value is usually 128M/256M depending on configuration. > The strategy to split file is not reasonable for the number of splits tend to > be less so that can't make good use of the parallel computing. > What's more, when enable parallelism inference for hive source, it'll set the > parallelism of Hive source to the num of splits when it's not bigger than max > parallelism. So, it'll limit the source parallelism and could degrade the > perfermance. > To solve this problem, the idea is to calcuate a reasonable split size based > on files's total size, block size, default parallelism or parallelism > configured by user. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-27338) Improve spliting file for Hive table with orc format
[ https://issues.apache.org/jira/browse/FLINK-27338?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] luoyuxia updated FLINK-27338: - Description: Currently, for hive source, it'll use the hdfs block size configured with key dfs.block.size in hdfs-site.xml as the max split size to split the files. The default value is usually 128M/256M depending on configuration. The strategy to split file is not reasonable for the number of splits tend to be less so that can't make good use of the parallel computing. What's more, when enable parallelism inference for hive source, it'll set the parallelism of Hive source to the num of splits when it's not bigger than max parallelism. So, it'll limit the source parallelism and could degrade the perfermance. To solve this problem, the idea is to calcuate a reasonable split size based on files's total size, block size, default parallelism or parallelism configured by user. The Jira is try to improve the splitting file logic for Hive table with orc format. was: Currently, for hive source, it'll use the hdfs block size configured with key dfs.block.size in hdfs-site.xml as the max split size to split the files. The default value is usually 128M/256M depending on configuration. The strategy to split file is not reasonable for the number of splits tend to be less so that can't make good use of the parallel computing. What's more, when enable parallelism inference for hive source, it'll set the parallelism of Hive source to the num of splits when it's not bigger than max parallelism. So, it'll limit the source parallelism and could degrade the perfermance. To solve this problem, the idea is to calcuate a reasonable split size based on files's total size, block size, default parallelism or parallelism configured by user. > Improve spliting file for Hive table with orc format > > > Key: FLINK-27338 > URL: https://issues.apache.org/jira/browse/FLINK-27338 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Hive >Reporter: luoyuxia >Assignee: luoyuxia >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0 > > > Currently, for hive source, it'll use the hdfs block size configured with key > dfs.block.size in hdfs-site.xml as the max split size to split the files. The > default value is usually 128M/256M depending on configuration. > The strategy to split file is not reasonable for the number of splits tend to > be less so that can't make good use of the parallel computing. > What's more, when enable parallelism inference for hive source, it'll set the > parallelism of Hive source to the num of splits when it's not bigger than max > parallelism. So, it'll limit the source parallelism and could degrade the > perfermance. > To solve this problem, the idea is to calcuate a reasonable split size based > on files's total size, block size, default parallelism or parallelism > configured by user. > > The Jira is try to improve the splitting file logic for Hive table with orc > format. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] dependabot[bot] closed pull request #20287: Bump aws-java-sdk-s3 from 1.12.7 to 1.12.261 in /flink-connectors/flink-connector-kinesis
dependabot[bot] closed pull request #20287: Bump aws-java-sdk-s3 from 1.12.7 to 1.12.261 in /flink-connectors/flink-connector-kinesis URL: https://github.com/apache/flink/pull/20287 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] dependabot[bot] commented on pull request #20287: Bump aws-java-sdk-s3 from 1.12.7 to 1.12.261 in /flink-connectors/flink-connector-kinesis
dependabot[bot] commented on PR #20287: URL: https://github.com/apache/flink/pull/20287#issuecomment-1208977103 Looks like com.amazonaws:aws-java-sdk-s3 is up-to-date now, so this is no longer needed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-28027) Initialise Async Sink maximum number of in flight messages to low number for rate limiting strategy
[ https://issues.apache.org/jira/browse/FLINK-28027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-28027: --- Labels: pull-request-available (was: ) > Initialise Async Sink maximum number of in flight messages to low number for > rate limiting strategy > --- > > Key: FLINK-28027 > URL: https://issues.apache.org/jira/browse/FLINK-28027 > Project: Flink > Issue Type: Bug > Components: Connectors / Common, Connectors / Kinesis >Affects Versions: 1.15.0 >Reporter: Zichen Liu >Priority: Minor > Labels: pull-request-available > Fix For: 1.16.0 > > > *Background* > In the AsyncSinkWriter, we implement a rate limiting strategy. > The initial value for the maximum number of in flight messages is set > extremely high ({{{}maxBatchSize * maxInFlightRequests{}}}). > However, in accordance with the AIMD strategy, the TCP implementation for > congestion control has found a small value to start with [is > better]([https://en.wikipedia.org/wiki/TCP_congestion_control#Slow_start]). > *Suggestion* > A better default might be: > * maxBatchSize > * maxBatchSize / parallelism -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] dannycranmer merged pull request #20476: [FLINK-28094][kinesis][glue] Updating AWS SDK versions for Kinesis connectors and Glue Schema Registry formats
dannycranmer merged PR #20476: URL: https://github.com/apache/flink/pull/20476 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] sthm opened a new pull request, #20509: [FLINK-28027][connectors/async-sink] Implement slow start strategy fo…
sthm opened a new pull request, #20509: URL: https://github.com/apache/flink/pull/20509 ## What is the purpose of the change Implement slow start strategy for AIMD to avoid overloading destination on initialization. ## Brief change log Adapt initialization of the AIMD strategy to set the maximum initial rate to a low number rather than starting with the theoretical maximum. ## Verifying this change Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing This change is already covered by existing tests, such as `testInitialRateIsSetByConstructor`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): yes (during initialization of the sink) - Anything that affects deployment or recovery: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] luoyuxia commented on pull request #20501: [FLINK-28778][SQL/API] Bulk fetch of table and column statistics for given partitions
luoyuxia commented on PR #20501: URL: https://github.com/apache/flink/pull/20501#issuecomment-1208973074 @godfreyhe Thanks for your review. I have addressed all your comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] luoyuxia commented on a diff in pull request #20501: [FLINK-28778][SQL/API] Bulk fetch of table and column statistics for given partitions
luoyuxia commented on code in PR #20501: URL: https://github.com/apache/flink/pull/20501#discussion_r940951883 ## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveStatsUtil.java: ## @@ -90,6 +102,257 @@ public static Map createCatalogColumnSt return colStats; } +public static Map createCatalogPartitionColumnStats( +HiveMetastoreClientWrapper client, +HiveShim hiveShim, +Table hiveTable, +String partitionName, +List partitionColsSchema, +String defaultPartitionName) { +Map partitionColumnStats = new HashMap<>(); +List partitionCols = new ArrayList<>(partitionColsSchema.size()); +List partitionColsType = new ArrayList<>(partitionColsSchema.size()); +for (FieldSchema fieldSchema : partitionColsSchema) { +partitionCols.add(fieldSchema.getName()); +partitionColsType.add( +HiveTypeUtil.toFlinkType( + TypeInfoUtils.getTypeInfoFromTypeString(fieldSchema.getType())) +.getLogicalType()); +} + +// the partition column and values for the partition column +Map partitionColValues = new HashMap<>(); +CatalogPartitionSpec partitionSpec = +HivePartitionUtils.createPartitionSpec(partitionName, defaultPartitionName); +for (int i = 0; i < partitionCols.size(); i++) { +String partitionCol = partitionCols.get(i); +String partitionStrVal = partitionSpec.getPartitionSpec().get(partitionCols.get(i)); +if (partitionStrVal == null) { +partitionColValues.put(partitionCol, null); +} else { +partitionColValues.put( +partitionCol, +HivePartitionUtils.restorePartitionValueFromType( +hiveShim, +partitionStrVal, +partitionColsType.get(i), +defaultPartitionName)); +} +} + +// calculate statistic for each partition column +for (int i = 0; i < partitionCols.size(); i++) { +Object partitionValue = partitionColValues.get(partitionCols.get(i)); +LogicalType logicalType = partitionColsType.get(i); +CatalogColumnStatisticsDataBase catalogColumnStatistics = +getColumnStatistics( +client, +hiveTable, +logicalType, +partitionValue, +i, +defaultPartitionName); +if (catalogColumnStatistics != null) { +partitionColumnStats.put(partitionCols.get(i), catalogColumnStatistics); +} +} + +return partitionColumnStats; +} + +/** + * Get statistics for specific partition column. + * + * @param logicalType the specific partition column's logical type + * @param partitionValue the partition value for the specific partition column + * @param partitionColIndex the index of the specific partition + * @param defaultPartitionName the default partition name for null value + */ +private static CatalogColumnStatisticsDataBase getColumnStatistics( +HiveMetastoreClientWrapper client, +Table hiveTable, +LogicalType logicalType, +Object partitionValue, +int partitionColIndex, +String defaultPartitionName) { +switch (logicalType.getTypeRoot()) { +case CHAR: +case VARCHAR: +{ +Long maxLength = null; +Double avgLength = null; +Long nullCount = 0L; +if (partitionValue == null) { +nullCount = +getNullCount( +client, hiveTable, partitionColIndex, defaultPartitionName); +} else { +long valLength = ((String) partitionValue).length(); +maxLength = valLength; +avgLength = (double) valLength; +} +return new CatalogColumnStatisticsDataString( +maxLength, avgLength, 1L, nullCount); +} +case BOOLEAN: +{ +long trueCount = 0L; +long falseCount = 0L; +Long nullCount = 0L; +if (partitionValue == null) { +nullCount = +getNullCount( +client, hiveTable, partitionColIndex,
[GitHub] [flink] liming30 commented on a diff in pull request #20405: [FLINK-28010][state] Use deleteRange to optimize the clear method of RocksDBMapState.
liming30 commented on code in PR #20405: URL: https://github.com/apache/flink/pull/20405#discussion_r940950420 ## flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java: ## @@ -223,4 +223,27 @@ public StateIncrementalVisitor getStateIncrementalVisitor( throw new UnsupportedOperationException( "Global state entry iterator is unsupported for RocksDb backend"); } + +/** + * Similar to decimal addition, add 1 to the last digit to calculate the upper bound. + * + * @param prefix the starting prefix for seek. + * @return end prefix for seek. + */ +protected final byte[] calculateUpperBound(byte[] prefix) { +byte[] upperBound = new byte[prefix.length]; +System.arraycopy(prefix, 0, upperBound, 0, prefix.length); +boolean overFlow = true; +for (int i = prefix.length - 1; i >= 0; i--) { +int unsignedValue = prefix[i] & 0xff; +int result = unsignedValue + 1; +upperBound[i] = (byte) (result & 0xff); +if (result >> 8 == 0) { +overFlow = false; +break; +} +} +Preconditions.checkArgument(!overFlow, "The upper boundary overflows."); Review Comment: If RocksDB uses unsigned numbers for matching, why do we need to compute with signed numbers here? If we handle overflow at the Java level by appending byte to the prefix, this will not get the correct result in RocksDB. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-27155) Reduce multiple reads to the same Changelog file in the same taskmanager during restore
[ https://issues.apache.org/jira/browse/FLINK-27155?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Roman Khachatryan closed FLINK-27155. - Resolution: Fixed Merged as 4a0870229fc20b594eba0bccfad071a4edc3e185. > Reduce multiple reads to the same Changelog file in the same taskmanager > during restore > --- > > Key: FLINK-27155 > URL: https://issues.apache.org/jira/browse/FLINK-27155 > Project: Flink > Issue Type: Sub-task > Components: Runtime / State Backends >Reporter: Feifan Wang >Assignee: Feifan Wang >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0 > > > h3. Background > In the current implementation, State changes of different operators in the > same taskmanager may be written to the same changelog file, which effectively > reduces the number of files and requests to DFS. > But on the other hand, the current implementation also reads the same > changelog file multiple times on recovery. More specifically, the number of > times the same changelog file is accessed is related to the number of > ChangeSets contained in it. And since each read needs to skip the preceding > bytes, this network traffic is also wasted. > The result is a lot of unnecessary request to DFS when there are multiple > slots and keyed state in the same taskmanager. > h3. Proposal > We can reduce multiple reads to the same changelog file in the same > taskmanager during restore. > One possible approach is to read the changelog file all at once and cache it > in memory or local file for a period of time when reading the changelog file. > I think this could be a subtask of [v2 FLIP-158: Generalized incremental > checkpoints|https://issues.apache.org/jira/browse/FLINK-25842] . > Hi [~ym] , [~roman] how do you think about ? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] rkhachatryan merged pull request #20152: [FLINK-27155][changelog] Reduce multiple reads to the same Changelog …
rkhachatryan merged PR #20152: URL: https://github.com/apache/flink/pull/20152 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on pull request #20152: [FLINK-27155][changelog] Reduce multiple reads to the same Changelog …
rkhachatryan commented on PR #20152: URL: https://github.com/apache/flink/pull/20152#issuecomment-1208969905 Thank you for the contribution @zoltar9264, great work! Glad to hear that and no worries :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-28881) PubSubConsumingTest.testStoppingConnectorWhenDeserializationSchemaIndicatesEndOfStream test failure
[ https://issues.apache.org/jira/browse/FLINK-28881?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Huang Xingbo updated FLINK-28881: - Description: {code:java} 2022-08-09T03:14:22.0113691Z Aug 09 03:14:22 [ERROR] org.apache.flink.streaming.connectors.gcp.pubsub.PubSubConsumingTest.testStoppingConnectorWhenDeserializationSchemaIndicatesEndOfStream Time elapsed: 1.867 s <<< FAILURE! 2022-08-09T03:14:22.0114504Z Aug 09 03:14:22 java.lang.AssertionError: 2022-08-09T03:14:22.0114903Z Aug 09 03:14:22 2022-08-09T03:14:22.0115263Z Aug 09 03:14:22 Expected: <[1, 2, 3]> 2022-08-09T03:14:22.0115679Z Aug 09 03:14:22 but: was <[1, 2]> 2022-08-09T03:14:22.0116232Z Aug 09 03:14:22at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) 2022-08-09T03:14:22.0116871Z Aug 09 03:14:22at org.junit.Assert.assertThat(Assert.java:964) 2022-08-09T03:14:22.0117580Z Aug 09 03:14:22at org.junit.Assert.assertThat(Assert.java:930) 2022-08-09T03:14:22.0118460Z Aug 09 03:14:22at org.apache.flink.streaming.connectors.gcp.pubsub.PubSubConsumingTest.testStoppingConnectorWhenDeserializationSchemaIndicatesEndOfStream(PubSubConsumingTest.java:119) {code} CI link: https://dev.azure.com/leonardBang/Azure_CI/_build/results?buildId=713&view=logs&j=3796201e-ea88-5776-0ea8-9ccca648a70c&t=8ca54b76-085e-5cf1-8060-2c500a258258 was: CI link: https://dev.azure.com/leonardBang/Azure_CI/_build/results?buildId=713&view=logs&j=3796201e-ea88-5776-0ea8-9ccca648a70c&t=8ca54b76-085e-5cf1-8060-2c500a258258 > PubSubConsumingTest.testStoppingConnectorWhenDeserializationSchemaIndicatesEndOfStream > test failure > --- > > Key: FLINK-28881 > URL: https://issues.apache.org/jira/browse/FLINK-28881 > Project: Flink > Issue Type: Bug > Components: Connectors / Google Cloud PubSub >Affects Versions: 1.16.0 >Reporter: Leonard Xu >Priority: Major > Labels: test-stability > > {code:java} > 2022-08-09T03:14:22.0113691Z Aug 09 03:14:22 [ERROR] > org.apache.flink.streaming.connectors.gcp.pubsub.PubSubConsumingTest.testStoppingConnectorWhenDeserializationSchemaIndicatesEndOfStream > Time elapsed: 1.867 s <<< FAILURE! > 2022-08-09T03:14:22.0114504Z Aug 09 03:14:22 java.lang.AssertionError: > 2022-08-09T03:14:22.0114903Z Aug 09 03:14:22 > 2022-08-09T03:14:22.0115263Z Aug 09 03:14:22 Expected: <[1, 2, 3]> > 2022-08-09T03:14:22.0115679Z Aug 09 03:14:22 but: was <[1, 2]> > 2022-08-09T03:14:22.0116232Z Aug 09 03:14:22 at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) > 2022-08-09T03:14:22.0116871Z Aug 09 03:14:22 at > org.junit.Assert.assertThat(Assert.java:964) > 2022-08-09T03:14:22.0117580Z Aug 09 03:14:22 at > org.junit.Assert.assertThat(Assert.java:930) > 2022-08-09T03:14:22.0118460Z Aug 09 03:14:22 at > org.apache.flink.streaming.connectors.gcp.pubsub.PubSubConsumingTest.testStoppingConnectorWhenDeserializationSchemaIndicatesEndOfStream(PubSubConsumingTest.java:119) > {code} > CI link: > https://dev.azure.com/leonardBang/Azure_CI/_build/results?buildId=713&view=logs&j=3796201e-ea88-5776-0ea8-9ccca648a70c&t=8ca54b76-085e-5cf1-8060-2c500a258258 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] liming30 commented on a diff in pull request #20405: [FLINK-28010][state] Use deleteRange to optimize the clear method of RocksDBMapState.
liming30 commented on code in PR #20405: URL: https://github.com/apache/flink/pull/20405#discussion_r940948140 ## flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java: ## @@ -223,4 +223,27 @@ public StateIncrementalVisitor getStateIncrementalVisitor( throw new UnsupportedOperationException( "Global state entry iterator is unsupported for RocksDb backend"); } + +/** + * Similar to decimal addition, add 1 to the last digit to calculate the upper bound. + * + * @param prefix the starting prefix for seek. + * @return end prefix for seek. + */ +protected final byte[] calculateUpperBound(byte[] prefix) { +byte[] upperBound = new byte[prefix.length]; +System.arraycopy(prefix, 0, upperBound, 0, prefix.length); +boolean overFlow = true; +for (int i = prefix.length - 1; i >= 0; i--) { +int unsignedValue = prefix[i] & 0xff; +int result = unsignedValue + 1; +upperBound[i] = (byte) (result & 0xff); +if (result >> 8 == 0) { +overFlow = false; +break; +} +} +Preconditions.checkArgument(!overFlow, "The upper boundary overflows."); Review Comment: Assuming the max parallelism is 128(so we always serialize the KeyGroup with 1 byte.), the original parallelism is 1. Then we get a DB instance of KeyGroupRange(0, 127), which is `currentKeyGroupRange`. When our parallelism is adjusted to 2, our new `KeyGroup` assignments are [0, 63] and [64, 127]. For `KeyGroupRange`(0, 63) we delete data belonging to `KeyGroup`(64,127). Since the `deleteRange` of `RocksDB` contains the left border and does not contain the right border, in order to completely delete the data less than or equal to `KeyGroup`(127), we add 1 to the `KeyGroup`, which makes us actually serialize 128, and the serialized result is 0x80, also produces overflow in the Java sense. > CompositeKeySerializationUtils.serializeKeyGroup( **keyGroupRange.getEndKeyGroup() + 1**, stopKeyGroupPrefixBytes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-28881) PubSubConsumingTest.testStoppingConnectorWhenDeserializationSchemaIndicatesEndOfStream test failure
[ https://issues.apache.org/jira/browse/FLINK-28881?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Huang Xingbo updated FLINK-28881: - Labels: test-stability (was: ) > PubSubConsumingTest.testStoppingConnectorWhenDeserializationSchemaIndicatesEndOfStream > test failure > --- > > Key: FLINK-28881 > URL: https://issues.apache.org/jira/browse/FLINK-28881 > Project: Flink > Issue Type: Bug > Components: Connectors / Google Cloud PubSub >Affects Versions: 1.16.0 >Reporter: Leonard Xu >Priority: Major > Labels: test-stability > > CI link: > https://dev.azure.com/leonardBang/Azure_CI/_build/results?buildId=713&view=logs&j=3796201e-ea88-5776-0ea8-9ccca648a70c&t=8ca54b76-085e-5cf1-8060-2c500a258258 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-27597) Support get column statistic for Hive partition table
[ https://issues.apache.org/jira/browse/FLINK-27597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] luoyuxia resolved FLINK-27597. -- Resolution: Fixed > Support get column statistic for Hive partition table > - > > Key: FLINK-27597 > URL: https://issues.apache.org/jira/browse/FLINK-27597 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Hive >Reporter: luoyuxia >Assignee: luoyuxia >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0 > > > Currently, for Hive paritioned table, we don't return any statistic which > isn't friendly to sql optimization when it comes to partitioned table. So we > need to return statistic for hive parition table. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-28306) Support get table statistic for Hive partition table
[ https://issues.apache.org/jira/browse/FLINK-28306?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] luoyuxia resolved FLINK-28306. -- Resolution: Fixed > Support get table statistic for Hive partition table > > > Key: FLINK-28306 > URL: https://issues.apache.org/jira/browse/FLINK-28306 > Project: Flink > Issue Type: Sub-task >Reporter: luoyuxia >Assignee: luoyuxia >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-28848) Introduces LOOKUP join hint to support delayed retry for lookup join (table alias unsupported in hint)
[ https://issues.apache.org/jira/browse/FLINK-28848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] godfrey he reassigned FLINK-28848: -- Assignee: lincoln lee > Introduces LOOKUP join hint to support delayed retry for lookup join (table > alias unsupported in hint) > --- > > Key: FLINK-28848 > URL: https://issues.apache.org/jira/browse/FLINK-28848 > Project: Flink > Issue Type: Sub-task >Reporter: lincoln lee >Assignee: lincoln lee >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0 > > > main part of flip234 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] yunfengzhou-hub commented on a diff in pull request #20454: [FLINK-28639][Runtime/Checkpointing] Preserve consistency of events from subtask to OC
yunfengzhou-hub commented on code in PR #20454: URL: https://github.com/apache/flink/pull/20454#discussion_r940930386 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/RegularOperatorChain.java: ## @@ -216,24 +248,64 @@ private OperatorSnapshotFutures buildOperatorSnapshotFutures( return snapshotInProgress; } -private static OperatorSnapshotFutures checkpointStreamOperator( +private OperatorSnapshotFutures checkpointStreamOperator( StreamOperator op, CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointStreamFactory storageLocation, Supplier isRunning) throws Exception { try { -return op.snapshotState( -checkpointMetaData.getCheckpointId(), -checkpointMetaData.getTimestamp(), -checkpointOptions, -storageLocation); +if (operatorEventDispatcher.containsOperatorEventGateway(op.getOperatorID())) { +operatorEventDispatcher.snapshotOperatorEventGatewayState( +op.getOperatorID(), getOperatorStateBackend(op)); +} + +OperatorSnapshotFutures futures = +op.snapshotState( +checkpointMetaData.getCheckpointId(), +checkpointMetaData.getTimestamp(), +checkpointOptions, +storageLocation); + +if (operatorEventDispatcher.containsOperatorEventGateway(op.getOperatorID())) { +operatorEventDispatcher.notifyOperatorSnapshotStateCompleted( Review Comment: Although `OperatorSnapshotFutures` have not been completed when the method above returns, it is guaranteed that the states to be saved into the snapshot would not be changed by any following operations on the operator, thus it is safe to send ACK events from the operator to its OC and release the buffered events. ## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorEventDispatcherImpl.java: ## @@ -66,6 +78,14 @@ void dispatchEventToHandlers( throw new FlinkException("Could not deserialize operator event", e); } +if (evt instanceof CloseGatewayEvent) { +OperatorEventGatewayImpl gateway = getOperatorEventGateway(operatorID); +gateway.sendEventToCoordinator( +new AcknowledgeCloseGatewayEvent((CloseGatewayEvent) evt)); Review Comment: According to our offline discussion, I'll update the closing gateway process, to close gateways before sending `AcknowledgeCloseGatewayEvent`, so as to make sure there will be no event reaching the OC after the `AcknowledgeCloseGatewayEvent`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] yunfengzhou-hub commented on a diff in pull request #20454: [FLINK-28639][Runtime/Checkpointing] Preserve consistency of events from subtask to OC
yunfengzhou-hub commented on code in PR #20454: URL: https://github.com/apache/flink/pull/20454#discussion_r940930386 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/RegularOperatorChain.java: ## @@ -216,24 +248,64 @@ private OperatorSnapshotFutures buildOperatorSnapshotFutures( return snapshotInProgress; } -private static OperatorSnapshotFutures checkpointStreamOperator( +private OperatorSnapshotFutures checkpointStreamOperator( StreamOperator op, CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointStreamFactory storageLocation, Supplier isRunning) throws Exception { try { -return op.snapshotState( -checkpointMetaData.getCheckpointId(), -checkpointMetaData.getTimestamp(), -checkpointOptions, -storageLocation); +if (operatorEventDispatcher.containsOperatorEventGateway(op.getOperatorID())) { +operatorEventDispatcher.snapshotOperatorEventGatewayState( +op.getOperatorID(), getOperatorStateBackend(op)); +} + +OperatorSnapshotFutures futures = +op.snapshotState( +checkpointMetaData.getCheckpointId(), +checkpointMetaData.getTimestamp(), +checkpointOptions, +storageLocation); + +if (operatorEventDispatcher.containsOperatorEventGateway(op.getOperatorID())) { +operatorEventDispatcher.notifyOperatorSnapshotStateCompleted( Review Comment: Although `OperatorSnapshotFutures` have not been completed when the method above returns, it is guaranteed that the states to be saved into the snapshot would not be changed by any following operations on the operator, thus it is safe to send ACK events from the operator to its OC and release the buffered events. According to our offline discussion, I'll update the closing gateway process, to close gateways before sending `AcknowledgeCloseGatewayEvent`, so as to make sure there will be no event reaching the OC after the `AcknowledgeCloseGatewayEvent`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] PatrickRen commented on a diff in pull request #20495: [FLINK-28868][connector/hbase] Migrate HBase table connector to the new LookupFunction interface
PatrickRen commented on code in PR #20495: URL: https://github.com/apache/flink/pull/20495#discussion_r940929212 ## flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataLookupFunction.java: ## @@ -68,54 +65,37 @@ public class HBaseRowDataLookupFunction extends TableFunction { private transient HTable table; private transient HBaseSerde serde; -private final long cacheMaxSize; -private final long cacheExpireMs; private final int maxRetryTimes; -private transient Cache cache; public HBaseRowDataLookupFunction( Configuration configuration, String hTableName, HBaseTableSchema hbaseTableSchema, String nullStringLiteral, -HBaseLookupOptions lookupOptions) { +int maxRetryTimes) { this.serializedConfig = HBaseConfigurationUtil.serializeConfiguration(configuration); this.hTableName = hTableName; this.hbaseTableSchema = hbaseTableSchema; this.nullStringLiteral = nullStringLiteral; -this.cacheMaxSize = lookupOptions.getCacheMaxSize(); -this.cacheExpireMs = lookupOptions.getCacheExpireMs(); -this.maxRetryTimes = lookupOptions.getMaxRetryTimes(); +this.maxRetryTimes = maxRetryTimes; } /** * The invoke entry point of lookup function. * - * @param rowKey the lookup key. Currently only support single rowkey. + * @param keyRow - A {@link RowData} that wraps lookup keys. Currently only support single + * rowkey. */ -public void eval(Object rowKey) throws IOException { -if (cache != null) { -RowData cacheRowData = cache.getIfPresent(rowKey); -if (cacheRowData != null) { -collect(cacheRowData); -return; -} -} +@Override +public Collection lookup(RowData keyRow) throws IOException { for (int retry = 0; retry <= maxRetryTimes; retry++) { try { // fetch result -Get get = serde.createGet(rowKey); +Get get = serde.createGet(((GenericRowData) keyRow).getField(0)); Review Comment: The implementation of LookupFunction will pass a GenericRowData as key row and it's safe to cast for now. I added a TODO here to update the logic once we improve the LookupFunction in the future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] PatrickRen commented on a diff in pull request #20489: [FLINK-28854][connector/jdbc] Migrate JDBC lookup table to the new LookupFunction and caching interface
PatrickRen commented on code in PR #20489: URL: https://github.com/apache/flink/pull/20489#discussion_r940927210 ## flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java: ## @@ -181,6 +179,27 @@ private JdbcDmlOptions getJdbcDmlOptions( .build(); } +@Nullable +private LookupCache getLookupCache(ReadableConfig tableOptions) { +LookupCache cache = null; +// Legacy cache options +if (tableOptions.get(LOOKUP_CACHE_MAX_ROWS) > 0 +&& tableOptions.get(LOOKUP_CACHE_TTL).compareTo(Duration.ZERO) > 0) { +cache = +DefaultLookupCache.newBuilder() + .maximumSize(tableOptions.get(LOOKUP_CACHE_MAX_ROWS)) + .expireAfterWrite(tableOptions.get(LOOKUP_CACHE_TTL)) + .cacheMissingKey(tableOptions.get(LOOKUP_CACHE_MISSING_KEY)) +.build(); +} Review Comment: Actually old keys are not fully compatible with new options considering we introduces LookupOptions#CACHE_TYPE and maybe full caching in the future. I'll keep the current logic here for now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] xintongsong commented on pull request #20456: [FLINK-28785][network] Hybrid shuffle consumer thread and upstream thread may have deadlock
xintongsong commented on PR #20456: URL: https://github.com/apache/flink/pull/20456#issuecomment-1208929966 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-18572) Flink web UI doesn't display tolerable-failed-checkpoints
[ https://issues.apache.org/jira/browse/FLINK-18572?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17577144#comment-17577144 ] Zhanghao Chen commented on FLINK-18572: --- Verified that this is already included in Web UI in Flink 1.16, we can close this ticket so that we won't confuse others. > Flink web UI doesn't display tolerable-failed-checkpoints > - > > Key: FLINK-18572 > URL: https://issues.apache.org/jira/browse/FLINK-18572 > Project: Flink > Issue Type: Improvement > Components: Runtime / REST, Runtime / Web Frontend >Affects Versions: 1.11.0 >Reporter: Steven Zhen Wu >Priority: Not a Priority > Labels: auto-deprioritized-major, auto-deprioritized-minor > Attachments: image-2020-07-12-10-14-49-990.png > > > might be helpful to display the number of tolerable-failed-checkpoints in web > UI. > > !image-2020-07-12-10-14-49-990.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] lindong28 commented on pull request #20275: [FLINK-28606][Runtime/Checkpointing] Preserve consistency of OperatorEvent from OperatorCoordinator to subtasks
lindong28 commented on PR #20275: URL: https://github.com/apache/flink/pull/20275#issuecomment-1208916714 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] Sxnan commented on pull request #20491: [FLINK-28857][docs] Add Document for DataStream Cache API
Sxnan commented on PR #20491: URL: https://github.com/apache/flink/pull/20491#issuecomment-1208908109 @gaoyunhaii Thanks for your comment! The PR has been updated accordingly. Could you please take another look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-25592) Improvement of parser, optimizer and execution for Flink Batch SQL
[ https://issues.apache.org/jira/browse/FLINK-25592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu updated FLINK-25592: Fix Version/s: (was: 1.16.0) > Improvement of parser, optimizer and execution for Flink Batch SQL > -- > > Key: FLINK-25592 > URL: https://issues.apache.org/jira/browse/FLINK-25592 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API, Table SQL / Planner, Table SQL / Runtime >Reporter: Jing Zhang >Priority: Major > > This is a parent JIRA to track improvements on Flink Batch SQL, including > parser, optimizer and execution. > For example, > 1. using Hive dialect and default dialect, some sql query would be translated > into different plans > 2. specify hash/sort aggregate strategy and hash/sort merge join strategy in > sql hint > 3. take parquet metadata into consideration in optimization > 4. and so on > Please note, some improvements are not limited to batch sql. Maybe streaming > sql job could also benefits from some improvements in this JIRA. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-27660) Table API support create function using customed jar
[ https://issues.apache.org/jira/browse/FLINK-27660?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu updated FLINK-27660: Fix Version/s: (was: 1.16.0) > Table API support create function using customed jar > > > Key: FLINK-27660 > URL: https://issues.apache.org/jira/browse/FLINK-27660 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Affects Versions: 1.16.0 >Reporter: dalongliu >Assignee: Peter Huang >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-28871) Make DPP also works if batch shuffle mode is not ALL_BLOCKING
[ https://issues.apache.org/jira/browse/FLINK-28871?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] godfrey he reassigned FLINK-28871: -- Assignee: Yun Gao > Make DPP also works if batch shuffle mode is not ALL_BLOCKING > - > > Key: FLINK-28871 > URL: https://issues.apache.org/jira/browse/FLINK-28871 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Affects Versions: 1.16.0 >Reporter: Yun Gao >Assignee: Yun Gao >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0 > > > Currently dpp only works when all edges is blocking. Otherwise if the dynamic > filtering data collector is located in the same region with the fact source, > the fact source would not be started after the data collector task. > To fix this issue, we'll force the collector task's output edges to be > blocking. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] yunfengzhou-hub commented on pull request #20275: [FLINK-28606][Runtime/Checkpointing] Preserve consistency of OperatorEvent from OperatorCoordinator to subtasks
yunfengzhou-hub commented on PR #20275: URL: https://github.com/apache/flink/pull/20275#issuecomment-1208903178 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] Myasuka commented on a diff in pull request #20405: [FLINK-28010][state] Use deleteRange to optimize the clear method of RocksDBMapState.
Myasuka commented on code in PR #20405: URL: https://github.com/apache/flink/pull/20405#discussion_r940884644 ## flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java: ## @@ -223,4 +223,27 @@ public StateIncrementalVisitor getStateIncrementalVisitor( throw new UnsupportedOperationException( "Global state entry iterator is unsupported for RocksDb backend"); } + +/** + * Similar to decimal addition, add 1 to the last digit to calculate the upper bound. + * + * @param prefix the starting prefix for seek. + * @return end prefix for seek. + */ +protected final byte[] calculateUpperBound(byte[] prefix) { +byte[] upperBound = new byte[prefix.length]; +System.arraycopy(prefix, 0, upperBound, 0, prefix.length); +boolean overFlow = true; +for (int i = prefix.length - 1; i >= 0; i--) { +int unsignedValue = prefix[i] & 0xff; +int result = unsignedValue + 1; +upperBound[i] = (byte) (result & 0xff); +if (result >> 8 == 0) { +overFlow = false; +break; +} +} +Preconditions.checkArgument(!overFlow, "The upper boundary overflows."); Review Comment: Though RocksDB uses `unsigned` bytes, it will be really strange to treat `-128` with a single byte as not overflow on the java side. As I said before, we will not meet the case to write `-128` within [clipDBWithKeyGroupRange](https://github.com/apache/flink/blob/master/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java#L128). During the restoring, we cannot change the max parallelism. If we can make the condition `currentKeyGroupRange.getEndKeyGroup() > targetKeyGroupRange.getEndKeyGroup()` pass, the max parallelism is larger than 128, which means the `prefixKeyGroupBytes` is 2. Thus, we will get `0x0080` for the prefix byte array of `127 + 1`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] godfreyhe commented on a diff in pull request #20497: [FLINK-28871][table-planner] Force the output edges of dynamic filtering data collector to be BLOCKING
godfreyhe commented on code in PR #20497: URL: https://github.com/apache/flink/pull/20497#discussion_r940877755 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/DynamicFilteringDependencyProcessor.java: ## @@ -84,4 +100,108 @@ protected void visitNode(ExecNode node) { return execGraph; } + +private ExecNodeGraph enforceDimSideBlockingExchange( Review Comment: make sense -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-28882) ENCODE return error
Luning Wang created FLINK-28882: --- Summary: ENCODE return error Key: FLINK-28882 URL: https://issues.apache.org/jira/browse/FLINK-28882 Project: Flink Issue Type: Improvement Components: Table SQL / API, Table SQL / Planner Affects Versions: 1.15.0 Reporter: Luning Wang Run the following in SQL Client, it will return 'k' rather than 'kyuubi' but it returns 'kyuubi' in the 1.14 version. {code:java} select encode('kyuubi', 'UTF-8') {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] dianfu commented on a diff in pull request #20499: [FLINK-28862][python][format/parquet] Support ParquetRowDataWriter
dianfu commented on code in PR #20499: URL: https://github.com/apache/flink/pull/20499#discussion_r94089 ## flink-python/pyflink/datastream/__init__.py: ## @@ -225,6 +225,9 @@ - :class:`formats.parquet.ParquetColumnarRowInputFormat`: A :class:`connectors.file_system.BulkFormat` to read columnar parquet files into Row data in a batch-processing fashion. +- :class:`formats.parquet.ParquetRowDataWriter`: + Convenient builder to create a :class:`BulkWriterFactory` that writes Rows with a defined Review Comment: ```suggestion Convenient builder to create a :class:`connector.file_system.BulkWriterFactory` that writes Rows with a defined ``` ## flink-python/pyflink/datastream/__init__.py: ## @@ -225,6 +225,9 @@ - :class:`formats.parquet.ParquetColumnarRowInputFormat`: A :class:`connectors.file_system.BulkFormat` to read columnar parquet files into Row data in a batch-processing fashion. +- :class:`formats.parquet.ParquetRowDataWriter`: + Convenient builder to create a :class:`BulkWriterFactory` that writes Rows with a defined + :class:`RowType` into Parquet files in a batch fashion. Review Comment: ```suggestion :class:`pyflink.table.types.RowType` into Parquet files in a batch fashion. ``` ## flink-python/pyflink/datastream/formats/parquet.py: ## @@ -155,3 +158,54 @@ def for_generic_record(schema: 'AvroSchema') -> 'BulkWriterFactory': jvm = get_gateway().jvm JAvroParquetWriters = jvm.org.apache.flink.formats.parquet.avro.AvroParquetWriters return BulkWriterFactory(JAvroParquetWriters.forGenericRecord(schema._j_schema)) + + +class ParquetRowDataWriter(object): Review Comment: Could we group the classes in this file a bit to make it more readable? e.g. in the following order: AvroParquetReaders, AvroParquetWriters, ParquetColumnarRowInputFormat, ParquetBulkWriter. ## flink-python/pyflink/datastream/formats/parquet.py: ## @@ -155,3 +158,54 @@ def for_generic_record(schema: 'AvroSchema') -> 'BulkWriterFactory': jvm = get_gateway().jvm JAvroParquetWriters = jvm.org.apache.flink.formats.parquet.avro.AvroParquetWriters return BulkWriterFactory(JAvroParquetWriters.forGenericRecord(schema._j_schema)) + + +class ParquetRowDataWriter(object): +""" +Convenient builder to create a :class:`BulkWriterFactory` that writes Rows with a defined +:class:`RowType` into Parquet files in a batch fashion. + +.. versionadded:: 1.16.0 +""" + +@staticmethod +def for_row_type(row_type: RowType, hadoop_config: Optional[Configuration] = None, + utc_timestamp: bool = False) -> 'BulkWriterFactory': +""" +Create a :class:`RowDataBulkWriterFactory` that writes Rows records with a defined Review Comment: ```suggestion Create a :class:`pyflink.datastream.connectors.file_system.RowDataBulkWriterFactory` that writes Rows records with a defined ``` Need to make sure it could generate a valid link in the documentation. ## flink-python/pyflink/datastream/formats/parquet.py: ## @@ -155,3 +158,54 @@ def for_generic_record(schema: 'AvroSchema') -> 'BulkWriterFactory': jvm = get_gateway().jvm JAvroParquetWriters = jvm.org.apache.flink.formats.parquet.avro.AvroParquetWriters return BulkWriterFactory(JAvroParquetWriters.forGenericRecord(schema._j_schema)) + + +class ParquetRowDataWriter(object): +""" +Convenient builder to create a :class:`BulkWriterFactory` that writes Rows with a defined +:class:`RowType` into Parquet files in a batch fashion. + +.. versionadded:: 1.16.0 +""" + +@staticmethod +def for_row_type(row_type: RowType, hadoop_config: Optional[Configuration] = None, + utc_timestamp: bool = False) -> 'BulkWriterFactory': +""" +Create a :class:`RowDataBulkWriterFactory` that writes Rows records with a defined +:class:`RowType` into Parquet files in a batch fashion. + +Example: +:: + +>>> row_type = DataTypes.ROW([ +... DataTypes.FIELD('string', DataTypes.STRING()), +... DataTypes.FIELD('int_array', DataTypes.ARRAY(DataTypes.INT())) +... ]) +>>> row_type_info = Types.ROW_NAMED( +... ['string', 'int_array'], +... [Types.STRING(), Types.LIST(Types.INT())] +... ) +>>> sink = FileSink.for_bulk_format( +... OUTPUT_DIR, ParquetRowDataWriter.for_row_type( +... row_type, +... hadoop_config=Configuration(), +... utc_timestamp=True, +... ) +... ).build() Review Comment: ```suggestion ... ) ``` ## flink-python/pyflink/datastream/for
[GitHub] [flink] Myasuka commented on a diff in pull request #20421: [FLINK-28732][state] Deprecate ambiguous StateTTLConfig#cleanFullSnapshot API
Myasuka commented on code in PR #20421: URL: https://github.com/apache/flink/pull/20421#discussion_r940868846 ## docs/content/docs/dev/datastream/fault-tolerance/state.md: ## @@ -474,9 +474,10 @@ ttl_config = StateTtlConfig \ For more fine-grained control over some special cleanup in background, you can configure it separately as described below. Currently, heap state backend relies on incremental cleanup and RocksDB backend uses compaction filter for background cleanup. -# Cleanup in full snapshot +# Cleanup in full scan snapshot -Additionally, you can activate the cleanup at the moment of taking the full state snapshot which +Additionally, you can activate the cleanup at the moment of taking the full scan state snapshot (including the canonical savepoint, +or the full/incremental checkpoint of hashmap state-backend, or the full checkpoint of RocksDB state-backend), which Review Comment: Got it, I will refactor these descriptions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-28881) PubSubConsumingTest.testStoppingConnectorWhenDeserializationSchemaIndicatesEndOfStream test failure
Leonard Xu created FLINK-28881: -- Summary: PubSubConsumingTest.testStoppingConnectorWhenDeserializationSchemaIndicatesEndOfStream test failure Key: FLINK-28881 URL: https://issues.apache.org/jira/browse/FLINK-28881 Project: Flink Issue Type: Bug Components: Connectors / Google Cloud PubSub Affects Versions: 1.16.0 Reporter: Leonard Xu CI link: https://dev.azure.com/leonardBang/Azure_CI/_build/results?buildId=713&view=logs&j=3796201e-ea88-5776-0ea8-9ccca648a70c&t=8ca54b76-085e-5cf1-8060-2c500a258258 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] zoltar9264 commented on pull request #20152: [FLINK-27155][changelog] Reduce multiple reads to the same Changelog …
zoltar9264 commented on PR #20152: URL: https://github.com/apache/flink/pull/20152#issuecomment-1208878901 Thanks @rkhachatryan ! I'm very grateful for your help over the past few weeks, I've learned a lot in it, and I'm sorry for the trouble I've caused you due to my inexperience. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-27691) RankHarnessTest. testUpdateRankWithRowNumberSortKeyDropsToNotLast test failed with result mismatch
[ https://issues.apache.org/jira/browse/FLINK-27691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Leonard Xu updated FLINK-27691: --- Priority: Critical (was: Major) > RankHarnessTest. testUpdateRankWithRowNumberSortKeyDropsToNotLast test failed > with result mismatch > -- > > Key: FLINK-27691 > URL: https://issues.apache.org/jira/browse/FLINK-27691 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.16.0 >Reporter: Huang Xingbo >Assignee: lincoln lee >Priority: Critical > Labels: test-stability > > {code:java} > 2022-05-19T04:34:04.2677138Z May 19 04:34:04 [ERROR] > RankHarnessTest.testUpdateRankWithRowNumberSortKeyDropsToNotLast > 2022-05-19T04:34:04.2689553Z May 19 04:34:04 [ERROR] Run 1: [result > mismatch] > 2022-05-19T04:34:04.2690614Z May 19 04:34:04 expected: [+I(a,1,100,1), > 2022-05-19T04:34:04.2691128Z May 19 04:34:04 +I(b,1,90,2), > 2022-05-19T04:34:04.2691552Z May 19 04:34:04 +I(c,1,90,3), > 2022-05-19T04:34:04.2692235Z May 19 04:34:04 +I(d,1,80,4), > 2022-05-19T04:34:04.2692634Z May 19 04:34:04 +I(e,1,80,5), > 2022-05-19T04:34:04.2693060Z May 19 04:34:04 +I(f,1,70,6), > 2022-05-19T04:34:04.2693468Z May 19 04:34:04 +U(b,1,80,5), > 2022-05-19T04:34:04.2693874Z May 19 04:34:04 +U(c,1,90,2), > 2022-05-19T04:34:04.2694282Z May 19 04:34:04 +U(d,1,80,3), > 2022-05-19T04:34:04.2694670Z May 19 04:34:04 +U(e,1,80,4), > 2022-05-19T04:34:04.2696097Z May 19 04:34:04 -U(b,1,90,2), > 2022-05-19T04:34:04.2696718Z May 19 04:34:04 -U(c,1,90,3), > 2022-05-19T04:34:04.2697298Z May 19 04:34:04 -U(d,1,80,4), > 2022-05-19T04:34:04.2698102Z May 19 04:34:04 -U(e,1,80,5)] > 2022-05-19T04:34:04.2698758Z May 19 04:34:04 but was: [+I(a,1,100,1), > 2022-05-19T04:34:04.2699189Z May 19 04:34:04 +I(b,1,90,1), > 2022-05-19T04:34:04.2699607Z May 19 04:34:04 +I(c,1,90,2), > 2022-05-19T04:34:04.2700017Z May 19 04:34:04 +I(d,1,80,3), > 2022-05-19T04:34:04.2712164Z May 19 04:34:04 +I(e,1,80,4), > 2022-05-19T04:34:04.2712777Z May 19 04:34:04 +I(f,1,70,5), > 2022-05-19T04:34:04.2713191Z May 19 04:34:04 +U(b,1,80,4), > 2022-05-19T04:34:04.2713621Z May 19 04:34:04 +U(c,1,90,1), > 2022-05-19T04:34:04.2714029Z May 19 04:34:04 +U(d,1,80,2), > 2022-05-19T04:34:04.2714435Z May 19 04:34:04 +U(e,1,80,3), > 2022-05-19T04:34:04.2715272Z May 19 04:34:04 -U(b,1,90,1), > 2022-05-19T04:34:04.2715847Z May 19 04:34:04 -U(c,1,90,2), > 2022-05-19T04:34:04.2716420Z May 19 04:34:04 -U(d,1,80,3), > 2022-05-19T04:34:04.2716990Z May 19 04:34:04 -U(e,1,80,4)] > {code} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=35815&view=logs&j=a9db68b9-a7e0-54b6-0f98-010e0aff39e2&t=cdd32e0b-6047-565b-c58f-14054472f1be&l=10445 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-27691) RankHarnessTest. testUpdateRankWithRowNumberSortKeyDropsToNotLast test failed with result mismatch
[ https://issues.apache.org/jira/browse/FLINK-27691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17577122#comment-17577122 ] Leonard Xu commented on FLINK-27691: https://dev.azure.com/leonardBang/Azure_CI/_build/results?buildId=713&view=ms.vss-test-web.build-test-results-tab&runId=20042&resultId=111227&paneView=debug > RankHarnessTest. testUpdateRankWithRowNumberSortKeyDropsToNotLast test failed > with result mismatch > -- > > Key: FLINK-27691 > URL: https://issues.apache.org/jira/browse/FLINK-27691 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.16.0 >Reporter: Huang Xingbo >Assignee: lincoln lee >Priority: Major > Labels: test-stability > > {code:java} > 2022-05-19T04:34:04.2677138Z May 19 04:34:04 [ERROR] > RankHarnessTest.testUpdateRankWithRowNumberSortKeyDropsToNotLast > 2022-05-19T04:34:04.2689553Z May 19 04:34:04 [ERROR] Run 1: [result > mismatch] > 2022-05-19T04:34:04.2690614Z May 19 04:34:04 expected: [+I(a,1,100,1), > 2022-05-19T04:34:04.2691128Z May 19 04:34:04 +I(b,1,90,2), > 2022-05-19T04:34:04.2691552Z May 19 04:34:04 +I(c,1,90,3), > 2022-05-19T04:34:04.2692235Z May 19 04:34:04 +I(d,1,80,4), > 2022-05-19T04:34:04.2692634Z May 19 04:34:04 +I(e,1,80,5), > 2022-05-19T04:34:04.2693060Z May 19 04:34:04 +I(f,1,70,6), > 2022-05-19T04:34:04.2693468Z May 19 04:34:04 +U(b,1,80,5), > 2022-05-19T04:34:04.2693874Z May 19 04:34:04 +U(c,1,90,2), > 2022-05-19T04:34:04.2694282Z May 19 04:34:04 +U(d,1,80,3), > 2022-05-19T04:34:04.2694670Z May 19 04:34:04 +U(e,1,80,4), > 2022-05-19T04:34:04.2696097Z May 19 04:34:04 -U(b,1,90,2), > 2022-05-19T04:34:04.2696718Z May 19 04:34:04 -U(c,1,90,3), > 2022-05-19T04:34:04.2697298Z May 19 04:34:04 -U(d,1,80,4), > 2022-05-19T04:34:04.2698102Z May 19 04:34:04 -U(e,1,80,5)] > 2022-05-19T04:34:04.2698758Z May 19 04:34:04 but was: [+I(a,1,100,1), > 2022-05-19T04:34:04.2699189Z May 19 04:34:04 +I(b,1,90,1), > 2022-05-19T04:34:04.2699607Z May 19 04:34:04 +I(c,1,90,2), > 2022-05-19T04:34:04.2700017Z May 19 04:34:04 +I(d,1,80,3), > 2022-05-19T04:34:04.2712164Z May 19 04:34:04 +I(e,1,80,4), > 2022-05-19T04:34:04.2712777Z May 19 04:34:04 +I(f,1,70,5), > 2022-05-19T04:34:04.2713191Z May 19 04:34:04 +U(b,1,80,4), > 2022-05-19T04:34:04.2713621Z May 19 04:34:04 +U(c,1,90,1), > 2022-05-19T04:34:04.2714029Z May 19 04:34:04 +U(d,1,80,2), > 2022-05-19T04:34:04.2714435Z May 19 04:34:04 +U(e,1,80,3), > 2022-05-19T04:34:04.2715272Z May 19 04:34:04 -U(b,1,90,1), > 2022-05-19T04:34:04.2715847Z May 19 04:34:04 -U(c,1,90,2), > 2022-05-19T04:34:04.2716420Z May 19 04:34:04 -U(d,1,80,3), > 2022-05-19T04:34:04.2716990Z May 19 04:34:04 -U(e,1,80,4)] > {code} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=35815&view=logs&j=a9db68b9-a7e0-54b6-0f98-010e0aff39e2&t=cdd32e0b-6047-565b-c58f-14054472f1be&l=10445 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] JesseAtSZ commented on pull request #20091: [FLINK-27570][runtime] Fix checkFailureCounter in CheckpointFailureManager for finalize failure
JesseAtSZ commented on PR #20091: URL: https://github.com/apache/flink/pull/20091#issuecomment-1208875024 @rkhachatryan -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-28880) Fix CEP doc with wrong result of strict contiguity of looping patterns
[ https://issues.apache.org/jira/browse/FLINK-28880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17577117#comment-17577117 ] Huang Xingbo edited comment on FLINK-28880 at 8/9/22 3:39 AM: -- Merged into master via a08b050eb9fbea319275771fd9e95bbb025e2737 Merged into release-1.15 via 6eced8aa39c9e16b9918bd8b05fed1a1e17b6fe7 Merged into release-1.14 via c5108f6ab078ffbf1cbc483ad469197988e5553b was (Author: hxbks2ks): Merged into master via a08b050eb9fbea319275771fd9e95bbb025e2737 > Fix CEP doc with wrong result of strict contiguity of looping patterns > -- > > Key: FLINK-28880 > URL: https://issues.apache.org/jira/browse/FLINK-28880 > Project: Flink > Issue Type: Bug > Components: Library / CEP >Affects Versions: 1.15.1 >Reporter: Juntao Hu >Priority: Minor > Labels: pull-request-available > Fix For: 1.16.0, 1.15.2, 1.14.6 > > > https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/libs/cep/#contiguity-within-looping-patterns > The result of strict contiguity should be {a b1 c}, {a b2 c}, {a b3 c}, since > b is *followed by* c. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-28880) Fix CEP doc with wrong result of strict contiguity of looping patterns
[ https://issues.apache.org/jira/browse/FLINK-28880?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Huang Xingbo updated FLINK-28880: - Fix Version/s: 1.15.2 1.14.6 > Fix CEP doc with wrong result of strict contiguity of looping patterns > -- > > Key: FLINK-28880 > URL: https://issues.apache.org/jira/browse/FLINK-28880 > Project: Flink > Issue Type: Bug > Components: Library / CEP >Affects Versions: 1.15.1 >Reporter: Juntao Hu >Priority: Minor > Labels: pull-request-available > Fix For: 1.16.0, 1.15.2, 1.14.6 > > > https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/libs/cep/#contiguity-within-looping-patterns > The result of strict contiguity should be {a b1 c}, {a b2 c}, {a b3 c}, since > b is *followed by* c. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-28880) Fix CEP doc with wrong result of strict contiguity of looping patterns
[ https://issues.apache.org/jira/browse/FLINK-28880?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Huang Xingbo resolved FLINK-28880. -- Resolution: Fixed Merged into master via a08b050eb9fbea319275771fd9e95bbb025e2737 > Fix CEP doc with wrong result of strict contiguity of looping patterns > -- > > Key: FLINK-28880 > URL: https://issues.apache.org/jira/browse/FLINK-28880 > Project: Flink > Issue Type: Bug > Components: Library / CEP >Affects Versions: 1.15.1 >Reporter: Juntao Hu >Priority: Minor > Labels: pull-request-available > Fix For: 1.16.0 > > > https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/libs/cep/#contiguity-within-looping-patterns > The result of strict contiguity should be {a b1 c}, {a b2 c}, {a b3 c}, since > b is *followed by* c. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] HuangXingBo closed pull request #20508: [FLINK-28880][docs][cep] Fix wrong result of strict contiguity of looping patterns
HuangXingBo closed pull request #20508: [FLINK-28880][docs][cep] Fix wrong result of strict contiguity of looping patterns URL: https://github.com/apache/flink/pull/20508 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] leonardBang commented on a diff in pull request #20495: [FLINK-28868][connector/hbase] Migrate HBase table connector to the new LookupFunction interface
leonardBang commented on code in PR #20495: URL: https://github.com/apache/flink/pull/20495#discussion_r940855756 ## flink-connectors/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/source/HBaseDynamicTableSource.java: ## @@ -38,23 +39,19 @@ public HBaseDynamicTableSource( String tableName, HBaseTableSchema hbaseSchema, String nullStringLiteral, -HBaseLookupOptions lookupOptions) { -super(conf, tableName, hbaseSchema, nullStringLiteral, lookupOptions); +int maxRetryTimes, +@Nullable LookupCache cache) { +super(conf, tableName, hbaseSchema, nullStringLiteral, maxRetryTimes, cache); } @Override public DynamicTableSource copy() { return new HBaseDynamicTableSource( -conf, tableName, hbaseSchema, nullStringLiteral, lookupOptions); +conf, tableName, hbaseSchema, nullStringLiteral, maxRetryTimes, cache); } @Override Review Comment: Add missed `equals()` and `hashcode()` methods? ## flink-connectors/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseDynamicTableFactoryTest.java: ## @@ -141,6 +143,32 @@ public void testTableSourceFactory() { hbaseSchema.getQualifierDataTypes("f4")); } +@Test +public void testCacheOptions() { Review Comment: ```suggestion public void testLookupOptions() { ``` `lookup.max-retries` is not related to cachiung. ## flink-connectors/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseDynamicTableFactoryTest.java: ## @@ -144,6 +146,32 @@ public void testTableSourceFactory() { hbaseSchema.getQualifierDataTypes("f4")); } +@Test +public void testCacheOptions() { Review Comment: ```suggestion public void testLookupOptions() { ``` ## flink-connectors/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1DynamicTableFactory.java: ## @@ -78,13 +81,31 @@ public DynamicTableSource createDynamicTableSource(Context context) { String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL); HBaseTableSchema hbaseSchema = HBaseTableSchema.fromDataType(context.getPhysicalRowDataType()); +LookupCache cache = null; + +// Backward compatible to legacy caching options +if (tableOptions.get(LOOKUP_CACHE_MAX_ROWS) > 0 Review Comment: Could we use fallbackKeys way? ## flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptions.java: ## @@ -93,6 +94,8 @@ public class HBaseConnectorOptions { .defaultValue(false) .withDescription("whether to set async lookup."); +/** @deprecated Please use {@link LookupOptions#PARTIAL_CACHE_MAX_ROWS} instead. */ +@Deprecated public static final ConfigOption LOOKUP_CACHE_MAX_ROWS = Review Comment: add fallbackKeys ## flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataLookupFunction.java: ## @@ -68,54 +65,37 @@ public class HBaseRowDataLookupFunction extends TableFunction { private transient HTable table; private transient HBaseSerde serde; -private final long cacheMaxSize; -private final long cacheExpireMs; private final int maxRetryTimes; -private transient Cache cache; public HBaseRowDataLookupFunction( Configuration configuration, String hTableName, HBaseTableSchema hbaseTableSchema, String nullStringLiteral, -HBaseLookupOptions lookupOptions) { +int maxRetryTimes) { this.serializedConfig = HBaseConfigurationUtil.serializeConfiguration(configuration); this.hTableName = hTableName; this.hbaseTableSchema = hbaseTableSchema; this.nullStringLiteral = nullStringLiteral; -this.cacheMaxSize = lookupOptions.getCacheMaxSize(); -this.cacheExpireMs = lookupOptions.getCacheExpireMs(); -this.maxRetryTimes = lookupOptions.getMaxRetryTimes(); +this.maxRetryTimes = maxRetryTimes; } /** * The invoke entry point of lookup function. * - * @param rowKey the lookup key. Currently only support single rowkey. + * @param keyRow - A {@link RowData} that wraps lookup keys. Currently only support single + * rowkey. */ -public void eval(Object rowKey) throws IOException { -if (cache != null) { -RowData cacheRowData = cache.getIfPresent(rowKey); -if (cacheRowData != null) { -collect(cacheRowData); -return; -} -} +@Override +public Collection lookup(RowData keyRow) throw
[GitHub] [flink] flinkbot commented on pull request #20508: [FLINK-28880][docs][cep] Fix wrong result of strict contiguity of looping patterns
flinkbot commented on PR #20508: URL: https://github.com/apache/flink/pull/20508#issuecomment-1208871012 ## CI report: * f24ce7e938ce1740d9e47d3d620697a9b9d24bcd UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-28803) Add Transformer and Estimator for KBinsDiscretizer
[ https://issues.apache.org/jira/browse/FLINK-28803?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-28803: --- Labels: pull-request-available (was: ) > Add Transformer and Estimator for KBinsDiscretizer > -- > > Key: FLINK-28803 > URL: https://issues.apache.org/jira/browse/FLINK-28803 > Project: Flink > Issue Type: New Feature > Components: Library / Machine Learning >Affects Versions: ml-2.2.0 >Reporter: Zhipeng Zhang >Assignee: Zhipeng Zhang >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-28838) Avoid to notify the elementQueue consumer when the fetch result is empty
[ https://issues.apache.org/jira/browse/FLINK-28838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17577116#comment-17577116 ] Qingsheng Ren edited comment on FLINK-28838 at 8/9/22 3:30 AM: --- Thanks for the ticket [~aitozi]! Yeah we can definitely make some improvement as not all source implementations works as expected. I think your first proposal make sense to me. We can drop empty records earlier before putting into elementsQueue. I have some concerns about the second one (adding SleepTask) as we can hardly decide the length of sleep considering source implementations differ a lot. For example KafkaConsumer itself has ability to block the thread if no data is available for polling so it doesn't need the SleepTask at all. I prefer to leave it to split reader implementation itself as the doc of {{SplitReader#fetch}} is quite clear that it could be a blocking call. WDYT? BTW which source has this issue? We can check its implementation too. was (Author: renqs): Thanks for the ticket [~aitozi]! Yeah we can definitely make some improvement as not all source implementations works as expected. I think your first proposal make sense to me. We can drop empty records earlier before putting into elementsQueue. I have some concerns about the second one (adding SleepTask) as we can hardly decide the length of sleep considering source implementations vary a lot. For example KafkaConsumer itself has ability to block the thread if no data is available for polling so it doesn't need the SleepTask at all. I prefer to leave it to split reader implementation itself as the doc of {{SplitReader#fetch}} is quite clear that it could be a blocking call. WDYT? BTW which source has this issue? We can check its implementation too. > Avoid to notify the elementQueue consumer when the fetch result is empty > > > Key: FLINK-28838 > URL: https://issues.apache.org/jira/browse/FLINK-28838 > Project: Flink > Issue Type: Improvement > Components: Connectors / Common >Affects Versions: 1.15.0, 1.15.1 >Reporter: Aitozi >Priority: Major > Fix For: 1.16.0 > > Attachments: 20220805165441.jpg > > > When using the new source api, I found that if the source has no data, it > still brings high cpu usage. > The reason behind this is that it will always return the > {{RecordsWithSplitIds}} from the {{splitReader.fetch}} in FetchTask and it > will be added to the elementQueue. It will make the consumer be notified to > wake up frequently. > This causes the thread to keep busy to run and wake up, which leads to the > high sys and user cpu usage. > I think not all the SplitReader#fetch will block until there is data, if it > returns immediately when there is no data, then this problem will happen -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-28838) Avoid to notify the elementQueue consumer when the fetch result is empty
[ https://issues.apache.org/jira/browse/FLINK-28838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17577116#comment-17577116 ] Qingsheng Ren commented on FLINK-28838: --- Thanks for the ticket [~aitozi]! Yeah we can definitely make some improvement as not all source implementations works as expected. I think your first proposal make sense to me. We can drop empty records earlier before putting into elementsQueue. I have some concerns about the second one (adding SleepTask) as we can hardly decide the length of sleep considering source implementations vary a lot. For example KafkaConsumer itself has ability to block the thread if no data is available for polling so it doesn't need the SleepTask at all. I prefer to leave it to split reader implementation itself as the doc of {{SplitReader#fetch}} is quite clear that it could be a blocking call. WDYT? BTW which source has this issue? We can check its implementation too. > Avoid to notify the elementQueue consumer when the fetch result is empty > > > Key: FLINK-28838 > URL: https://issues.apache.org/jira/browse/FLINK-28838 > Project: Flink > Issue Type: Improvement > Components: Connectors / Common >Affects Versions: 1.15.0, 1.15.1 >Reporter: Aitozi >Priority: Major > Fix For: 1.16.0 > > Attachments: 20220805165441.jpg > > > When using the new source api, I found that if the source has no data, it > still brings high cpu usage. > The reason behind this is that it will always return the > {{RecordsWithSplitIds}} from the {{splitReader.fetch}} in FetchTask and it > will be added to the elementQueue. It will make the consumer be notified to > wake up frequently. > This causes the thread to keep busy to run and wake up, which leads to the > high sys and user cpu usage. > I think not all the SplitReader#fetch will block until there is data, if it > returns immediately when there is no data, then this problem will happen -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-ml] zhipeng93 opened a new pull request, #139: [FLINK-28803] Add Transformer and Estimator for KBinsDiscretizer
zhipeng93 opened a new pull request, #139: URL: https://github.com/apache/flink-ml/pull/139 ## What is the purpose of the change - Add Transformer and Estimator for KBinsDiscretizer[1] in Flink ML. ## Brief change log - Added Transformer and Estimator for LinearSVC. - Added java test/example for Transformer and Estimator for KBinsDiscretizer. - Added python source/test/example for KBinsDiscretizer. - Comparing with Sklearn [1], we made the following changes: - We removed `encode` parameter since we can achieve the same goal with a onehot-encoder following KBinsDiscretizer. - We removed `randomState` since it is non-trivial to support reproduciable sampling in a distributed setting, while in sklearn (single-node) setting, it is much easier. If the users would like this feature, we can add it later. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with @Public(Evolving): (no) - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (Java doc) [1] https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.KBinsDiscretizer.html -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-28699) Native rocksdb full snapshot in non-incremental checkpointing
[ https://issues.apache.org/jira/browse/FLINK-28699?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang resolved FLINK-28699. -- Fix Version/s: 1.16.0 Resolution: Fixed merged in master: d9a067e5e1c8672930b0ea7d76400a1d3020a1e2 > Native rocksdb full snapshot in non-incremental checkpointing > - > > Key: FLINK-28699 > URL: https://issues.apache.org/jira/browse/FLINK-28699 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Affects Versions: 1.14.5, 1.15.1 >Reporter: Lihe Ma >Assignee: Lihe Ma >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0 > > > When rocksdb statebackend is used and state.backend.incremental enabled, > flink will figure out newly created sst files generated by rocksdb during > checkpoint, and read all the states from rocksdb and write to files during > savepoint [1]. > When state.backend.incremental disabled, flink will read all the states from > rocksdb and generate state files in checkpoint and savepoint [2]. This makes > sense in savepoint, cause user can take a savepoint with rocksdb statebackend > and then restore it using another statebackend, but in checkpoint, > deserialisation and serialisation of state results in performance loss. > If the native rocksdb snapshot is introduced in full snapshot, theoretically > better performance can be achieved. At the same time, savepoint remains the > same as before. > > # > https://github.com/apache/flink/blob/master/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java > # > https://github.com/apache/flink/blob/master/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] fsk119 commented on a diff in pull request #20493: [FLINK-28632][sql-gateway][hive] Allow to GetColumns/GetPrimaryKeys/GetTableTypes in the HiveServer2 Endpoint.
fsk119 commented on code in PR #20493: URL: https://github.com/apache/flink/pull/20493#discussion_r940202528 ## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/OperationExecutorFactory.java: ## @@ -294,4 +408,64 @@ private static List getSupportedHiveType() { INTERVAL_YEAR_MONTH_TYPE, INTERVAL_DAY_TIME_TYPE)); } + +/** + * The column size for this type. For numeric data this is the maximum precision. For character + * data this is the length in characters. For datetime types this is the length in characters of + * the String representation (assuming the maximum allowed precision of the fractional seconds + * component). For binary data this is the length in bytes. Null is returned for for data types + * where the column size is not applicable. + */ +// TODO +private static Integer getColumnSize(Type hiveColumnType, LogicalType flinkColumnType) { +if (hiveColumnType.isNumericType()) { +// Exactly precision for DECIMAL_TYPE and maximum precision for others. +return hiveColumnType == Type.DECIMAL_TYPE +? ((DecimalType) flinkColumnType).getPrecision() +: hiveColumnType.getMaxPrecision(); +} +switch (hiveColumnType) { +case STRING_TYPE: +case BINARY_TYPE: +return Integer.MAX_VALUE; +case CHAR_TYPE: +case VARCHAR_TYPE: +return TypeInfoUtils.getCharacterLengthForType( +getPrimitiveTypeInfo(hiveColumnType.getName())); +case DATE_TYPE: +return 10; +case TIMESTAMP_TYPE: +return 29; +// case TIMESTAMPLOCALTZ_TYPE: +// return 31; +// 还是用flinkColumnType来实现? +default: +return null; +} +} + +/** + * The number of fractional digits for this type. Null is returned for data types where this is + * not applicable. + */ +private static Integer getDecimalDigits(Type hiveColumnType, LogicalType flinkColumnType) { Review Comment: ditto ## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/OperationExecutorFactory.java: ## @@ -294,4 +408,64 @@ private static List getSupportedHiveType() { INTERVAL_YEAR_MONTH_TYPE, INTERVAL_DAY_TIME_TYPE)); } + +/** + * The column size for this type. For numeric data this is the maximum precision. For character + * data this is the length in characters. For datetime types this is the length in characters of + * the String representation (assuming the maximum allowed precision of the fractional seconds + * component). For binary data this is the length in bytes. Null is returned for for data types + * where the column size is not applicable. + */ +// TODO +private static Integer getColumnSize(Type hiveColumnType, LogicalType flinkColumnType) { Review Comment: I think it's better to use LogicalType only because the two inputs are equal. Add `@Nullable` for the return type. ## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/OperationExecutorFactory.java: ## @@ -203,10 +229,98 @@ private static ResultSet executeGetTables( .collect(Collectors.toList())); } +private static ResultSet executeGetColumns( +SqlGatewayService service, +SessionHandle sessionHandle, +@Nullable String catalogName, +@Nullable String schemaName, +@Nullable String tableName, +@Nullable String columnName) { +String specifiedCatalogName = +isNullOrEmpty(catalogName) ? service.getCurrentCatalog(sessionHandle) : catalogName; +Set schemaNames = +filter(service.listDatabases(sessionHandle, specifiedCatalogName), schemaName); +Set tableKinds = new HashSet<>(Arrays.asList(TableKind.TABLE, TableKind.VIEW)); +List rowData = new ArrayList<>(); + +for (String schema : schemaNames) { +Set tableInfos = +filter( +service.listTables( +sessionHandle, specifiedCatalogName, schema, tableKinds), +candidates -> candidates.getIdentifier().getObjectName(), +tableName); + +for (TableInfo tableInfo : tableInfos) { +ResolvedCatalogBaseTable table = +service.getTable(sessionHandle, tableInfo.getIdentifier()); +List columns = table.getResolvedSchema().getColumns(); + +Set requiredColumnNames = +
[GitHub] [flink] bgeng777 commented on a diff in pull request #20508: [FLINK-28880][docs][cep] Fix wrong result of strict contiguity of looping patterns
bgeng777 commented on code in PR #20508: URL: https://github.com/apache/flink/pull/20508#discussion_r940855754 ## docs/content/docs/libs/cep.md: ## @@ -754,7 +754,7 @@ The contiguity will be applied between elements accepted into such a pattern. To illustrate the above with an example, a pattern sequence `"a b+ c"` (`"a"` followed by any(non-deterministic relaxed) sequence of one or more `"b"`'s followed by a `"c"`) with input `"a", "b1", "d1", "b2", "d2", "b3" "c"` will have the following results: - 1. **Strict Contiguity**: `{a b3 c}` -- the `"d1"` after `"b1"` causes `"b1"` to be discarded, the same happens for `"b2"` because of `"d2"`. + 1. **Strict Contiguity**: `{a b1 c}`, `{a b2 c}`, `{a b3 c}` - there's no adjacent `"b"`s. Review Comment: there's -> there are -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] Myasuka closed pull request #20399: [FLINK-28699][StateBackend] Native rocksdb full snapshot in non-incremental checkpointing
Myasuka closed pull request #20399: [FLINK-28699][StateBackend] Native rocksdb full snapshot in non-incremental checkpointing URL: https://github.com/apache/flink/pull/20399 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] reswqa commented on a diff in pull request #20333: [FLINK-28623][network] Optimize the use of off heap memory by blocking and hybrid shuffle reader
reswqa commented on code in PR #20333: URL: https://github.com/apache/flink/pull/20333#discussion_r940855012 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java: ## @@ -70,6 +71,17 @@ class SortMergeResultPartitionReadScheduler implements Runnable, BufferRecycler */ private static final Duration DEFAULT_BUFFER_REQUEST_TIMEOUT = Duration.ofMinutes(5); +/** Used to read buffers from file channel. */ Review Comment: fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-28568) Implements a new lookup join operator (sync mode only) with state to eliminate the non determinism
[ https://issues.apache.org/jira/browse/FLINK-28568?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] godfrey he closed FLINK-28568. -- Resolution: Fixed Fixed in master: 3f18cafa0581613ef9900da0478b3501617dc64f > Implements a new lookup join operator (sync mode only) with state to > eliminate the non determinism > -- > > Key: FLINK-28568 > URL: https://issues.apache.org/jira/browse/FLINK-28568 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Runtime >Reporter: lincoln lee >Assignee: lincoln lee >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-28568) Implements a new lookup join operator (sync mode only) with state to eliminate the non determinism
[ https://issues.apache.org/jira/browse/FLINK-28568?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] godfrey he reassigned FLINK-28568: -- Assignee: lincoln lee > Implements a new lookup join operator (sync mode only) with state to > eliminate the non determinism > -- > > Key: FLINK-28568 > URL: https://issues.apache.org/jira/browse/FLINK-28568 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Runtime >Reporter: lincoln lee >Assignee: lincoln lee >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] bgeng777 commented on pull request #20508: [FLINK-28880][docs][cep] Fix wrong result of strict contiguity of looping patterns
bgeng777 commented on PR #20508: URL: https://github.com/apache/flink/pull/20508#issuecomment-1208865870 LGTM -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] godfreyhe closed pull request #20324: [FLINK-28568][table-runtime] Implements a new lookup join operator (sync mode only) with state to eliminate the non determinism
godfreyhe closed pull request #20324: [FLINK-28568][table-runtime] Implements a new lookup join operator (sync mode only) with state to eliminate the non determinism URL: https://github.com/apache/flink/pull/20324 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] leonardBang commented on a diff in pull request #20489: [FLINK-28854][connector/jdbc] Migrate JDBC lookup table to the new LookupFunction and caching interface
leonardBang commented on code in PR #20489: URL: https://github.com/apache/flink/pull/20489#discussion_r940843642 ## flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java: ## @@ -181,6 +179,27 @@ private JdbcDmlOptions getJdbcDmlOptions( .build(); } +@Nullable +private LookupCache getLookupCache(ReadableConfig tableOptions) { +LookupCache cache = null; +// Legacy cache options +if (tableOptions.get(LOOKUP_CACHE_MAX_ROWS) > 0 +&& tableOptions.get(LOOKUP_CACHE_TTL).compareTo(Duration.ZERO) > 0) { +cache = +DefaultLookupCache.newBuilder() + .maximumSize(tableOptions.get(LOOKUP_CACHE_MAX_ROWS)) + .expireAfterWrite(tableOptions.get(LOOKUP_CACHE_TTL)) + .cacheMissingKey(tableOptions.get(LOOKUP_CACHE_MISSING_KEY)) +.build(); +} Review Comment: After we add fallbackKeys, we can use new introducing options ## flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java: ## @@ -45,30 +43,29 @@ /** Test suite for {@link JdbcRowDataLookupFunction}. */ public class JdbcRowDataLookupFunctionTest extends JdbcLookupTestBase { -private static String[] fieldNames = new String[] {"id1", "id2", "comment1", "comment2"}; -private static DataType[] fieldDataTypes = +private static final String[] fieldNames = new String[] {"id1", "id2", "comment1", "comment2"}; +private static final DataType[] fieldDataTypes = new DataType[] { DataTypes.INT(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING() }; -private static String[] lookupKeys = new String[] {"id1", "id2"}; - -@Test -public void testEval() throws Exception { +private static final String[] lookupKeys = new String[] {"id1", "id2"}; -JdbcLookupOptions lookupOptions = JdbcLookupOptions.builder().build(); -JdbcRowDataLookupFunction lookupFunction = buildRowDataLookupFunction(lookupOptions); +@ParameterizedTest(name = "withFailure = {0}") +@ValueSource(booleans = {false, true}) +public void testLookup(boolean withFailure) throws Exception { +JdbcRowDataLookupFunction lookupFunction = buildRowDataLookupFunction(withFailure); ListOutputCollector collector = new ListOutputCollector(); lookupFunction.setCollector(collector); lookupFunction.open(null); lookupFunction.eval(1, StringData.fromString("1")); - -// close connection -lookupFunction.getDbConnection().close(); - +if (withFailure) { +// Close connection here, and this will be recovered by retry +lookupFunction.getDbConnection().close(); Review Comment: check null before call `close()` ## flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcConnectorOptions.java: ## @@ -116,6 +117,8 @@ public class JdbcConnectorOptions { // Lookup options // - +/** @deprecated please use {@link LookupOptions#PARTIAL_CACHE_MAX_ROWS} instead. */ +@Deprecated public static final ConfigOption LOOKUP_CACHE_MAX_ROWS = ConfigOptions.key("lookup.cache.max-rows") Review Comment: Could we add `lookup.cache.max-rows` to LookupOptions#PARTIAL_CACHE_MAX_ROWS 's fallbackKeys ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] godfreyhe commented on pull request #20324: [FLINK-28568][table-runtime] Implements a new lookup join operator (sync mode only) with state to eliminate the non determinism
godfreyhe commented on PR #20324: URL: https://github.com/apache/flink/pull/20324#issuecomment-1208864940 > An irrelevant failure case of es sink https://issues.apache.org/jira/browse/FLINK-28877 I will merge it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-28880) Fix CEP doc with wrong result of strict contiguity of looping patterns
[ https://issues.apache.org/jira/browse/FLINK-28880?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-28880: --- Labels: pull-request-available (was: ) > Fix CEP doc with wrong result of strict contiguity of looping patterns > -- > > Key: FLINK-28880 > URL: https://issues.apache.org/jira/browse/FLINK-28880 > Project: Flink > Issue Type: Bug > Components: Library / CEP >Affects Versions: 1.15.1 >Reporter: Juntao Hu >Priority: Minor > Labels: pull-request-available > Fix For: 1.16.0 > > > https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/libs/cep/#contiguity-within-looping-patterns > The result of strict contiguity should be {a b1 c}, {a b2 c}, {a b3 c}, since > b is *followed by* c. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] Vancior opened a new pull request, #20508: [FLINK-28880][docs][cep] Fix wrong result of strict contiguity of looping patterns
Vancior opened a new pull request, #20508: URL: https://github.com/apache/flink/pull/20508 ## What is the purpose of the change This PR fixes CEP doc. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (no applicable) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #20507: [FLINK-28139][docs] Add documentation for speculative execution
flinkbot commented on PR #20507: URL: https://github.com/apache/flink/pull/20507#issuecomment-1208863205 ## CI report: * 9d67f8ac91de12838b909c38e995455983c5d32f UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-28843) Fail to find incremental handle when restoring from changelog checkpoint in claim mode
[ https://issues.apache.org/jira/browse/FLINK-28843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang resolved FLINK-28843. -- Fix Version/s: 1.16.0 Resolution: Fixed merged in master: 7f708d0ba42f727b3f8c3d77cef2108206cad2de > Fail to find incremental handle when restoring from changelog checkpoint in > claim mode > -- > > Key: FLINK-28843 > URL: https://issues.apache.org/jira/browse/FLINK-28843 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.15.0, 1.15.1 >Reporter: Lihe Ma >Assignee: Lihe Ma >Priority: Critical > Labels: pull-request-available > Fix For: 1.16.0 > > > # When native checkpoint is enabled and incremental checkpointing is enabled > in rocksdb statebackend,if state data is greater than > state.storage.fs.memory-threshold,it will be stored in a data file > (FileStateHandle,RelativeFileStateHandle, etc) rather than stored with > ByteStreamStateHandle in checkpoint metadata, like base-path1/chk-1/file1. > # Then restore the job from base-path1/chk-1 in claim mode,using changelog > statebackend,and the checkpoint path is set to base-path2, then new > checkpoint will be saved in base-path2/chk-2, previous checkpoint file > (base-path1/chk-1/file1) is needed. > # Then restore the job from base-path2/chk-2 in changelog statebackend, > flink will try to read base-path2/chk-2/file1, rather than the actual file > location base-path1/chk-1/file1, which leads to FileNotFoundException and job > failed. > > How to reproduce? > # Set state.storage.fs.memory-threshold to a small value, like '20b'. > # {{run > org.apache.flink.test.checkpointing.ChangelogPeriodicMaterializationSwitchStateBackendITCase#testSwitchFromDisablingToEnablingInClaimMode}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] Myasuka merged pull request #20484: [FLINK-28843][StateBackend] Fix restore from incremental checkpoint with changelog checkpoint in claim mode
Myasuka merged PR #20484: URL: https://github.com/apache/flink/pull/20484 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-28880) Fix CEP doc with wrong result of strict contiguity of looping patterns
Juntao Hu created FLINK-28880: - Summary: Fix CEP doc with wrong result of strict contiguity of looping patterns Key: FLINK-28880 URL: https://issues.apache.org/jira/browse/FLINK-28880 Project: Flink Issue Type: Bug Components: Library / CEP Affects Versions: 1.15.1 Reporter: Juntao Hu Fix For: 1.16.0 https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/libs/cep/#contiguity-within-looping-patterns The result of strict contiguity should be {a b1 c}, {a b2 c}, {a b3 c}, since b is *followed by* c. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-28139) Add documentation for speculative execution
[ https://issues.apache.org/jira/browse/FLINK-28139?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-28139: --- Labels: pull-request-available (was: ) > Add documentation for speculative execution > --- > > Key: FLINK-28139 > URL: https://issues.apache.org/jira/browse/FLINK-28139 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Zhu Zhu >Assignee: Zhu Zhu >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] zhuzhurk opened a new pull request, #20507: [FLINK-28139][docs] Add documentation for speculative execution
zhuzhurk opened a new pull request, #20507: URL: https://github.com/apache/flink/pull/20507 ## What is the purpose of the change This PR adds documentation for speculative execution. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] HuangXingBo commented on a diff in pull request #20505: [FLINK-28876][python][format/orc] Support writing RowData into Orc files
HuangXingBo commented on code in PR #20505: URL: https://github.com/apache/flink/pull/20505#discussion_r940834537 ## flink-python/pyflink/datastream/formats/orc.py: ## @@ -0,0 +1,104 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Optional + +from pyflink.common import Configuration +from pyflink.datastream.connectors.file_system import BulkWriterFactory, RowDataBulkWriterFactory +from pyflink.datastream.utils import create_hadoop_configuration +from pyflink.java_gateway import get_gateway +from pyflink.table.types import _to_java_data_type, RowType +from pyflink.util.java_utils import to_jarray + + +class OrcBulkWriters(object): +""" +Convenient builder to create a :class:`BulkWriterFactory` that writes Row records with a defined +:class:`RowType` into Orc files in a batch fashion. + +.. versionadded:: 1.16.0 +""" + +@staticmethod +def for_row_data_vectorization(row_type: RowType, + writer_properties: Optional[Configuration] = None, + hadoop_config: Optional[Configuration] = None) \ +-> BulkWriterFactory: +""" +Create a :class:`RowDataBulkWriterFactory` that writes Row records with a defined +:class:`RowType` into Orc files in a batch fashion. + +Example: +:: + +>>> row_type = DataTypes.ROW([ +... DataTypes.FIELD('string', DataTypes.STRING()), +... DataTypes.FIELD('int_array', DataTypes.ARRAY(DataTypes.INT())) +... ]) +>>> row_type_info = Types.ROW_NAMED( +... ['string', 'int_array'], +... [Types.STRING(), Types.LIST(Types.INT())] +... ) +>>> sink = FileSink.for_bulk_format( +... OUTPUT_DIR, OrcBulkWriters.for_row_data_vectorization( +... row_type=row_type, +... writer_properties=Configuration(), +... hadoop_config=Configuration(), +... ) +... ).build() +>>> ds.map(lambda e: e, output_type=row_type_info).sink_to(sink) + +Note that in the above example, an identity map to indicate its :class:`RowTypeInfo` is +necessary before ``sink_to`` when ``ds`` is a source stream producing **RowData** records, +because :class:`RowDataBulkWriterFactory` assumes the input record type is :class:`Row`. +""" +if not isinstance(row_type, RowType): +raise TypeError('row_type must be an instance of RowType') + +j_data_type = _to_java_data_type(row_type) +jvm = get_gateway().jvm +j_row_type = j_data_type.getLogicalType() +orc_types = to_jarray( +jvm.org.apache.flink.table.types.logical.LogicalType, +[i for i in j_row_type.getChildren()] +) +type_description = jvm.org.apache.flink.orc \ +.OrcSplitReaderUtil.logicalTypeToOrcType(j_row_type) +if writer_properties is None: +writer_properties = Configuration() +if hadoop_config is None: +hadoop_config = Configuration() + +return RowDataBulkWriterFactory( +jvm.org.apache.flink.orc.writer.OrcBulkWriterFactory( +jvm.org.apache.flink.orc.vector.RowDataVectorizer( +type_description.toString(), +orc_types +), +OrcBulkWriters._create_properties(writer_properties), +create_hadoop_configuration(hadoop_config) +), +row_type +) + +@staticmethod +def _create_properties(conf: Configuration): Review Comment: this method can be moved to `datastream.utils`? ## docs/content/docs/connectors/datastream/filesystem.md: ## @@ -792,6 +792,36 @@ class PersonVectorizer(schema: String) extends Vectorizer[Person](schema) { {{< /tab >}} {{< /tabs >}} +For PyFlink users, `OrcBulkWriters.for_row_data_vectoriza
[GitHub] [flink] godfreyhe commented on a diff in pull request #20501: Catalog get statistics by partitions
godfreyhe commented on code in PR #20501: URL: https://github.com/apache/flink/pull/20501#discussion_r940832517 ## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveStatsUtil.java: ## @@ -90,6 +102,257 @@ public static Map createCatalogColumnSt return colStats; } +public static Map createCatalogPartitionColumnStats( +HiveMetastoreClientWrapper client, +HiveShim hiveShim, +Table hiveTable, +String partitionName, +List partitionColsSchema, +String defaultPartitionName) { +Map partitionColumnStats = new HashMap<>(); +List partitionCols = new ArrayList<>(partitionColsSchema.size()); +List partitionColsType = new ArrayList<>(partitionColsSchema.size()); +for (FieldSchema fieldSchema : partitionColsSchema) { +partitionCols.add(fieldSchema.getName()); +partitionColsType.add( +HiveTypeUtil.toFlinkType( + TypeInfoUtils.getTypeInfoFromTypeString(fieldSchema.getType())) +.getLogicalType()); +} + +// the partition column and values for the partition column +Map partitionColValues = new HashMap<>(); +CatalogPartitionSpec partitionSpec = +HivePartitionUtils.createPartitionSpec(partitionName, defaultPartitionName); +for (int i = 0; i < partitionCols.size(); i++) { +String partitionCol = partitionCols.get(i); +String partitionStrVal = partitionSpec.getPartitionSpec().get(partitionCols.get(i)); +if (partitionStrVal == null) { +partitionColValues.put(partitionCol, null); +} else { +partitionColValues.put( +partitionCol, +HivePartitionUtils.restorePartitionValueFromType( +hiveShim, +partitionStrVal, +partitionColsType.get(i), +defaultPartitionName)); +} +} + +// calculate statistic for each partition column +for (int i = 0; i < partitionCols.size(); i++) { +Object partitionValue = partitionColValues.get(partitionCols.get(i)); +LogicalType logicalType = partitionColsType.get(i); +CatalogColumnStatisticsDataBase catalogColumnStatistics = +getColumnStatistics( +client, +hiveTable, +logicalType, +partitionValue, +i, +defaultPartitionName); +if (catalogColumnStatistics != null) { +partitionColumnStats.put(partitionCols.get(i), catalogColumnStatistics); +} +} + +return partitionColumnStats; +} + +/** + * Get statistics for specific partition column. + * + * @param logicalType the specific partition column's logical type + * @param partitionValue the partition value for the specific partition column + * @param partitionColIndex the index of the specific partition + * @param defaultPartitionName the default partition name for null value + */ +private static CatalogColumnStatisticsDataBase getColumnStatistics( +HiveMetastoreClientWrapper client, +Table hiveTable, +LogicalType logicalType, +Object partitionValue, +int partitionColIndex, +String defaultPartitionName) { +switch (logicalType.getTypeRoot()) { +case CHAR: +case VARCHAR: +{ +Long maxLength = null; +Double avgLength = null; +Long nullCount = 0L; +if (partitionValue == null) { +nullCount = +getNullCount( +client, hiveTable, partitionColIndex, defaultPartitionName); +} else { +long valLength = ((String) partitionValue).length(); +maxLength = valLength; +avgLength = (double) valLength; +} +return new CatalogColumnStatisticsDataString( +maxLength, avgLength, 1L, nullCount); +} +case BOOLEAN: +{ +long trueCount = 0L; +long falseCount = 0L; +Long nullCount = 0L; +if (partitionValue == null) { +nullCount = +getNullCount( +client, hiveTable, partitionColIndex,
[GitHub] [flink] gaoyunhaii commented on a diff in pull request #20491: [FLINK-28857][docs] Add Document for DataStream Cache API
gaoyunhaii commented on code in PR #20491: URL: https://github.com/apache/flink/pull/20491#discussion_r940831093 ## docs/content/docs/dev/datastream/operators/overview.md: ## @@ -575,6 +575,44 @@ This feature is not yet supported in Python {{< /tab >}} {{< /tabs>}} +### Cache + DataStream → CachedDataStream + +Cache the intermediate result of the transformation. Currently, only supported with batch execution Review Comment: It is only supported that... with the batch execution mode? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-28753) Improve FilterIntoJoinRule which could push some predicate to another side
[ https://issues.apache.org/jira/browse/FLINK-28753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] godfrey he closed FLINK-28753. -- Resolution: Fixed Fixed in master: 0e6e4198ad84227c20e2c61c2dd8b0616324aa31 > Improve FilterIntoJoinRule which could push some predicate to another side > -- > > Key: FLINK-28753 > URL: https://issues.apache.org/jira/browse/FLINK-28753 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Reporter: godfrey he >Assignee: godfrey he >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0 > > > for sql: SELECT * FROM MyTable1 join MyTable2 ON a1 = a2 AND a1 = 2 > {{a1 = 2}} can be pushed into both left side and right side. but currently > only left side will be pushed by FilterIntoJoinRule. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] libenchao commented on pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown
libenchao commented on PR #20140: URL: https://github.com/apache/flink/pull/20140#issuecomment-1208843267 > I believe JdbcFilterPushdownVisitor needs to produce strings or some equivalent data, so that it can be used by JdbcDynamicTableSource::getScanRuntimeProvider, how should I make use of PrepareStatement here? Maybe I am missing something? @qingwei91 Currently `JdbcRowDataInputFormat` already uses `PreparedStatement`, and 'scan.partition' is implemented using it, hence we can do it. `JdbcFilterPushdownVisitor` does not necessarily need to return `String`, it can return anything we need. We may need to rethink about the design, especially for the following points: 1. What functions should we support, e.g. `IN`, `BETWEEN`? 2. Are these functions supported in all JDBC dialects, and do they use the same operator? If not, how should we abstract it out to make it dialect specific? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] Grypse commented on pull request #20467: [FLINK-28825][k8s] Add K8S pod scheduler into Kubernetes options
Grypse commented on PR #20467: URL: https://github.com/apache/flink/pull/20467#issuecomment-1208842402 The scheduler can be setting with schedulerName in podTemplate simply,I think there is no need to add options to kubernetes config. `podTemplate: apiVersion: v1 kind: Pod metadata: name: task-manager-pod-template namespace: flink-application-system spec: schedulerName: default-scheduler ` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] godfreyhe closed pull request #20432: [FLINK-28753][table-planner] Improve FilterIntoJoinRule which could push some predicates to another side
godfreyhe closed pull request #20432: [FLINK-28753][table-planner] Improve FilterIntoJoinRule which could push some predicates to another side URL: https://github.com/apache/flink/pull/20432 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-28183) flink-python is lacking several test dependencies
[ https://issues.apache.org/jira/browse/FLINK-28183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu closed FLINK-28183. --- Resolution: Fixed Merged to master via 5506930cc79eb131c66c5df0320045ad53437dce > flink-python is lacking several test dependencies > - > > Key: FLINK-28183 > URL: https://issues.apache.org/jira/browse/FLINK-28183 > Project: Flink > Issue Type: Technical Debt > Components: API / Python, Build System >Affects Versions: 1.16.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Critical > Labels: pull-request-available > Fix For: 1.16.0 > > > The pyflink_gateway_server searches the output directories of various modules > to construct a test classpath. > Half of these are not declared as actual test dependencies in maven. Because > of that there are no guarantees that these modules are actually built before > flink-python. > Additionally there seem to be no safeguards in place to verify that these > jars actually exist. > Considering that this is only required for testing most of this logic should > also be moved into maven, copying these dependencies to some directory under > flink-python/target, to make this de-facto build logic more discoverable. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-24095) Elasticsearch7DynamicSinkITCase.testWritingDocuments fails due to socket timeout
[ https://issues.apache.org/jira/browse/FLINK-24095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17577090#comment-17577090 ] Huang Xingbo edited comment on FLINK-24095 at 8/9/22 2:24 AM: -- 1.16 instance: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=39617&view=logs&j=4eda0b4a-bd0d-521a-0916-8285b9be9bb5&t=2ff6d5fa-53a6-53ac-bff7-fa524ea361a9 was (Author: hxbks2ks): https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=39617&view=logs&j=4eda0b4a-bd0d-521a-0916-8285b9be9bb5&t=2ff6d5fa-53a6-53ac-bff7-fa524ea361a9 > Elasticsearch7DynamicSinkITCase.testWritingDocuments fails due to socket > timeout > > > Key: FLINK-24095 > URL: https://issues.apache.org/jira/browse/FLINK-24095 > Project: Flink > Issue Type: Bug > Components: Connectors / ElasticSearch >Affects Versions: 1.14.0, 1.15.0 >Reporter: Xintong Song >Priority: Major > Labels: test-stability > Fix For: 1.16.0 > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=23250&view=logs&j=d44f43ce-542c-597d-bf94-b0718c71e5e8&t=ed165f3f-d0f6-524b-5279-86f8ee7d0e2d&l=12781 > {code} > Aug 31 23:06:22 Caused by: java.net.SocketTimeoutException: 30,000 > milliseconds timeout on connection http-outgoing-3 [ACTIVE] > Aug 31 23:06:22 at > org.elasticsearch.client.RestClient.extractAndWrapCause(RestClient.java:808) > Aug 31 23:06:22 at > org.elasticsearch.client.RestClient.performRequest(RestClient.java:248) > Aug 31 23:06:22 at > org.elasticsearch.client.RestClient.performRequest(RestClient.java:235) > Aug 31 23:06:22 at > org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1514) > Aug 31 23:06:22 at > org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1499) > Aug 31 23:06:22 at > org.elasticsearch.client.RestHighLevelClient.ping(RestHighLevelClient.java:720) > Aug 31 23:06:22 at > org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge.verifyClientConnection(Elasticsearch7ApiCallBridge.java:138) > Aug 31 23:06:22 at > org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge.verifyClientConnection(Elasticsearch7ApiCallBridge.java:46) > Aug 31 23:06:22 at > org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.open(ElasticsearchSinkBase.java:318) > Aug 31 23:06:22 at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) > Aug 31 23:06:22 at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:100) > Aug 31 23:06:22 at > org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:46) > Aug 31 23:06:22 at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:110) > Aug 31 23:06:22 at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:691) > Aug 31 23:06:22 at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) > Aug 31 23:06:22 at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:667) > Aug 31 23:06:22 at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:639) > Aug 31 23:06:22 at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) > Aug 31 23:06:22 at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) > Aug 31 23:06:22 at > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) > Aug 31 23:06:22 at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) > Aug 31 23:06:22 at java.lang.Thread.run(Thread.java:748) > Aug 31 23:06:22 Caused by: java.net.SocketTimeoutException: 30,000 > milliseconds timeout on connection http-outgoing-3 [ACTIVE] > Aug 31 23:06:22 at > org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:387) > Aug 31 23:06:22 at > org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92) > Aug 31 23:06:22 at > org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39) > Aug 31 23:06:22 at > org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:175) > Aug 31 23:06:22 at > org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:261) > Aug 31 23:06:22 at > org.apache.http.impl.nio.reactor.AbstractIOReactor.time
[jira] [Updated] (FLINK-24095) Elasticsearch7DynamicSinkITCase.testWritingDocuments fails due to socket timeout
[ https://issues.apache.org/jira/browse/FLINK-24095?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Huang Xingbo updated FLINK-24095: - Affects Version/s: 1.16.0 > Elasticsearch7DynamicSinkITCase.testWritingDocuments fails due to socket > timeout > > > Key: FLINK-24095 > URL: https://issues.apache.org/jira/browse/FLINK-24095 > Project: Flink > Issue Type: Bug > Components: Connectors / ElasticSearch >Affects Versions: 1.14.0, 1.15.0, 1.16.0 >Reporter: Xintong Song >Priority: Major > Labels: test-stability > Fix For: 1.16.0 > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=23250&view=logs&j=d44f43ce-542c-597d-bf94-b0718c71e5e8&t=ed165f3f-d0f6-524b-5279-86f8ee7d0e2d&l=12781 > {code} > Aug 31 23:06:22 Caused by: java.net.SocketTimeoutException: 30,000 > milliseconds timeout on connection http-outgoing-3 [ACTIVE] > Aug 31 23:06:22 at > org.elasticsearch.client.RestClient.extractAndWrapCause(RestClient.java:808) > Aug 31 23:06:22 at > org.elasticsearch.client.RestClient.performRequest(RestClient.java:248) > Aug 31 23:06:22 at > org.elasticsearch.client.RestClient.performRequest(RestClient.java:235) > Aug 31 23:06:22 at > org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1514) > Aug 31 23:06:22 at > org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1499) > Aug 31 23:06:22 at > org.elasticsearch.client.RestHighLevelClient.ping(RestHighLevelClient.java:720) > Aug 31 23:06:22 at > org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge.verifyClientConnection(Elasticsearch7ApiCallBridge.java:138) > Aug 31 23:06:22 at > org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge.verifyClientConnection(Elasticsearch7ApiCallBridge.java:46) > Aug 31 23:06:22 at > org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.open(ElasticsearchSinkBase.java:318) > Aug 31 23:06:22 at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) > Aug 31 23:06:22 at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:100) > Aug 31 23:06:22 at > org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:46) > Aug 31 23:06:22 at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:110) > Aug 31 23:06:22 at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:691) > Aug 31 23:06:22 at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) > Aug 31 23:06:22 at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:667) > Aug 31 23:06:22 at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:639) > Aug 31 23:06:22 at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) > Aug 31 23:06:22 at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) > Aug 31 23:06:22 at > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) > Aug 31 23:06:22 at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) > Aug 31 23:06:22 at java.lang.Thread.run(Thread.java:748) > Aug 31 23:06:22 Caused by: java.net.SocketTimeoutException: 30,000 > milliseconds timeout on connection http-outgoing-3 [ACTIVE] > Aug 31 23:06:22 at > org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:387) > Aug 31 23:06:22 at > org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92) > Aug 31 23:06:22 at > org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39) > Aug 31 23:06:22 at > org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:175) > Aug 31 23:06:22 at > org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:261) > Aug 31 23:06:22 at > org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:502) > Aug 31 23:06:22 at > org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:211) > Aug 31 23:06:22 at > org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280) > Aug 31 23:06:22 at > org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) > Aug 31 23:06:22 at > org.apache.http.impl.nio.reactor.Abs
[jira] [Commented] (FLINK-24095) Elasticsearch7DynamicSinkITCase.testWritingDocuments fails due to socket timeout
[ https://issues.apache.org/jira/browse/FLINK-24095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17577090#comment-17577090 ] Huang Xingbo commented on FLINK-24095: -- https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=39617&view=logs&j=4eda0b4a-bd0d-521a-0916-8285b9be9bb5&t=2ff6d5fa-53a6-53ac-bff7-fa524ea361a9 > Elasticsearch7DynamicSinkITCase.testWritingDocuments fails due to socket > timeout > > > Key: FLINK-24095 > URL: https://issues.apache.org/jira/browse/FLINK-24095 > Project: Flink > Issue Type: Bug > Components: Connectors / ElasticSearch >Affects Versions: 1.14.0, 1.15.0 >Reporter: Xintong Song >Priority: Major > Labels: test-stability > Fix For: 1.16.0 > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=23250&view=logs&j=d44f43ce-542c-597d-bf94-b0718c71e5e8&t=ed165f3f-d0f6-524b-5279-86f8ee7d0e2d&l=12781 > {code} > Aug 31 23:06:22 Caused by: java.net.SocketTimeoutException: 30,000 > milliseconds timeout on connection http-outgoing-3 [ACTIVE] > Aug 31 23:06:22 at > org.elasticsearch.client.RestClient.extractAndWrapCause(RestClient.java:808) > Aug 31 23:06:22 at > org.elasticsearch.client.RestClient.performRequest(RestClient.java:248) > Aug 31 23:06:22 at > org.elasticsearch.client.RestClient.performRequest(RestClient.java:235) > Aug 31 23:06:22 at > org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1514) > Aug 31 23:06:22 at > org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1499) > Aug 31 23:06:22 at > org.elasticsearch.client.RestHighLevelClient.ping(RestHighLevelClient.java:720) > Aug 31 23:06:22 at > org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge.verifyClientConnection(Elasticsearch7ApiCallBridge.java:138) > Aug 31 23:06:22 at > org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge.verifyClientConnection(Elasticsearch7ApiCallBridge.java:46) > Aug 31 23:06:22 at > org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.open(ElasticsearchSinkBase.java:318) > Aug 31 23:06:22 at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) > Aug 31 23:06:22 at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:100) > Aug 31 23:06:22 at > org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:46) > Aug 31 23:06:22 at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:110) > Aug 31 23:06:22 at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:691) > Aug 31 23:06:22 at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) > Aug 31 23:06:22 at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:667) > Aug 31 23:06:22 at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:639) > Aug 31 23:06:22 at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) > Aug 31 23:06:22 at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) > Aug 31 23:06:22 at > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) > Aug 31 23:06:22 at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) > Aug 31 23:06:22 at java.lang.Thread.run(Thread.java:748) > Aug 31 23:06:22 Caused by: java.net.SocketTimeoutException: 30,000 > milliseconds timeout on connection http-outgoing-3 [ACTIVE] > Aug 31 23:06:22 at > org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:387) > Aug 31 23:06:22 at > org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92) > Aug 31 23:06:22 at > org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39) > Aug 31 23:06:22 at > org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:175) > Aug 31 23:06:22 at > org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:261) > Aug 31 23:06:22 at > org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:502) > Aug 31 23:06:22 at > org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:211) > Aug 31 23:06:22 at > org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.
[jira] [Closed] (FLINK-28877) Elasticsearch7DynamicSinkITCase.testWritingDocumentsNoPrimaryKey case failed
[ https://issues.apache.org/jira/browse/FLINK-28877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Huang Xingbo closed FLINK-28877. Resolution: Duplicate > Elasticsearch7DynamicSinkITCase.testWritingDocumentsNoPrimaryKey case failed > > > Key: FLINK-28877 > URL: https://issues.apache.org/jira/browse/FLINK-28877 > Project: Flink > Issue Type: Bug >Reporter: lincoln lee >Priority: Major > Fix For: 1.16.0 > > > {code} > Aug 08 16:00:39 Caused by: java.lang.RuntimeException: An error occurred in > ElasticsearchSink. > Aug 08 16:00:39 at > org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkErrorAndRethrow(ElasticsearchSinkBase.java:426) > > Aug 08 16:00:39 at > org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.close(ElasticsearchSinkBase.java:365) > > Aug 08 16:00:39 at > org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41) > > Aug 08 16:00:39 at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:114) > > Aug 08 16:00:39 at > org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:163) > > Aug 08 16:00:39 at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:125) > > Aug 08 16:00:39 at > org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:1022) > > Aug 08 16:00:39 at > org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:900) > > Aug 08 16:00:39 at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:783) > > Aug 08 16:00:39 at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) > > Aug 08 16:00:39 at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914) > Aug 08 16:00:39 at > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) > Aug 08 16:00:39 at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) > Aug 08 16:00:39 at java.lang.Thread.run(Thread.java:748) > Aug 08 16:00:39 Caused by: java.net.SocketTimeoutException: 30,000 > milliseconds timeout on connection http-outgoing-0 [ACTIVE] > Aug 08 16:00:39 at > org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:387) > > Aug 08 16:00:39 at > org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92) > > Aug 08 16:00:39 at > org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39) > > Aug 08 16:00:39 at > org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:175) > > Aug 08 16:00:39 at > org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:261) > > Aug 08 16:00:39 at > org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:502) > > Aug 08 16:00:39 at > org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:211) > > Aug 08 16:00:39 at > org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280) > > Aug 08 16:00:39 at > org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) > > Aug 08 16:00:39 at > org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591) > {code} > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=39617&view=logs&j=4eda0b4a-bd0d-521a-0916-8285b9be9bb5&t=2ff6d5fa-53a6-53ac-bff7-fa524ea361a9 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] dianfu closed pull request #20470: [FLINK-28183][python] Model python test dependencies in Maven
dianfu closed pull request #20470: [FLINK-28183][python] Model python test dependencies in Maven URL: https://github.com/apache/flink/pull/20470 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-28860) CacheITCase.testBatchProduceCacheStreamConsume failed
[ https://issues.apache.org/jira/browse/FLINK-28860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Huang Xingbo reassigned FLINK-28860: Assignee: Xuannan Su > CacheITCase.testBatchProduceCacheStreamConsume failed > - > > Key: FLINK-28860 > URL: https://issues.apache.org/jira/browse/FLINK-28860 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.16.0 >Reporter: Huang Xingbo >Assignee: Xuannan Su >Priority: Critical > Labels: test-stability > Fix For: 1.16.0 > > > {code:java} > 2022-08-08T03:27:22.1988575Z Aug 08 03:27:22 [ERROR] > org.apache.flink.test.streaming.runtime.CacheITCase.testBatchProduceCacheStreamConsume(Path) > Time elapsed: 0.593 s <<< ERROR! > 2022-08-08T03:27:22.1989338Z Aug 08 03:27:22 java.lang.RuntimeException: > Producing cache IntermediateResult is not supported in streaming mode > 2022-08-08T03:27:22.1990401Z Aug 08 03:27:22 at > org.apache.flink.streaming.runtime.translators.CacheTransformationTranslator.translateForStreamingInternal(CacheTransformationTranslator.java:75) > 2022-08-08T03:27:22.1991511Z Aug 08 03:27:22 at > org.apache.flink.streaming.runtime.translators.CacheTransformationTranslator.translateForStreamingInternal(CacheTransformationTranslator.java:42) > 2022-08-08T03:27:22.1993671Z Aug 08 03:27:22 at > org.apache.flink.streaming.api.graph.SimpleTransformationTranslator.translateForStreaming(SimpleTransformationTranslator.java:62) > 2022-08-08T03:27:22.1994900Z Aug 08 03:27:22 at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:830) > 2022-08-08T03:27:22.1995748Z Aug 08 03:27:22 at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:560) > 2022-08-08T03:27:22.1996932Z Aug 08 03:27:22 at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.getParentInputIds(StreamGraphGenerator.java:851) > 2022-08-08T03:27:22.1998562Z Aug 08 03:27:22 at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:809) > 2022-08-08T03:27:22.1999581Z Aug 08 03:27:22 at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:560) > 2022-08-08T03:27:22.2000376Z Aug 08 03:27:22 at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:319) > 2022-08-08T03:27:22.2001359Z Aug 08 03:27:22 at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2250) > 2022-08-08T03:27:22.2002767Z Aug 08 03:27:22 at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2241) > 2022-08-08T03:27:22.2004121Z Aug 08 03:27:22 at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2227) > 2022-08-08T03:27:22.2005059Z Aug 08 03:27:22 at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2178) > 2022-08-08T03:27:22.2005939Z Aug 08 03:27:22 at > org.apache.flink.streaming.api.datastream.DataStream.executeAndCollectWithClient(DataStream.java:1469) > 2022-08-08T03:27:22.2006735Z Aug 08 03:27:22 at > org.apache.flink.streaming.api.datastream.DataStream.executeAndCollect(DataStream.java:1334) > 2022-08-08T03:27:22.2007500Z Aug 08 03:27:22 at > org.apache.flink.streaming.api.datastream.DataStream.executeAndCollect(DataStream.java:1320) > 2022-08-08T03:27:22.2008315Z Aug 08 03:27:22 at > org.apache.flink.test.streaming.runtime.CacheITCase.testBatchProduceCacheStreamConsume(CacheITCase.java:190) > {code} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=39518&view=logs&j=a57e0635-3fad-5b08-57c7-a4142d7d6fa9&t=2ef0effc-1da1-50e5-c2bd-aab434b1c5b7 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-28860) CacheITCase.testBatchProduceCacheStreamConsume failed
[ https://issues.apache.org/jira/browse/FLINK-28860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17577084#comment-17577084 ] Xuannan Su commented on FLINK-28860: [~hxbks2ks]I will take a look at it. Could you assign the ticket to me? > CacheITCase.testBatchProduceCacheStreamConsume failed > - > > Key: FLINK-28860 > URL: https://issues.apache.org/jira/browse/FLINK-28860 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.16.0 >Reporter: Huang Xingbo >Priority: Critical > Labels: test-stability > Fix For: 1.16.0 > > > {code:java} > 2022-08-08T03:27:22.1988575Z Aug 08 03:27:22 [ERROR] > org.apache.flink.test.streaming.runtime.CacheITCase.testBatchProduceCacheStreamConsume(Path) > Time elapsed: 0.593 s <<< ERROR! > 2022-08-08T03:27:22.1989338Z Aug 08 03:27:22 java.lang.RuntimeException: > Producing cache IntermediateResult is not supported in streaming mode > 2022-08-08T03:27:22.1990401Z Aug 08 03:27:22 at > org.apache.flink.streaming.runtime.translators.CacheTransformationTranslator.translateForStreamingInternal(CacheTransformationTranslator.java:75) > 2022-08-08T03:27:22.1991511Z Aug 08 03:27:22 at > org.apache.flink.streaming.runtime.translators.CacheTransformationTranslator.translateForStreamingInternal(CacheTransformationTranslator.java:42) > 2022-08-08T03:27:22.1993671Z Aug 08 03:27:22 at > org.apache.flink.streaming.api.graph.SimpleTransformationTranslator.translateForStreaming(SimpleTransformationTranslator.java:62) > 2022-08-08T03:27:22.1994900Z Aug 08 03:27:22 at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:830) > 2022-08-08T03:27:22.1995748Z Aug 08 03:27:22 at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:560) > 2022-08-08T03:27:22.1996932Z Aug 08 03:27:22 at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.getParentInputIds(StreamGraphGenerator.java:851) > 2022-08-08T03:27:22.1998562Z Aug 08 03:27:22 at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:809) > 2022-08-08T03:27:22.1999581Z Aug 08 03:27:22 at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:560) > 2022-08-08T03:27:22.2000376Z Aug 08 03:27:22 at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:319) > 2022-08-08T03:27:22.2001359Z Aug 08 03:27:22 at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2250) > 2022-08-08T03:27:22.2002767Z Aug 08 03:27:22 at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2241) > 2022-08-08T03:27:22.2004121Z Aug 08 03:27:22 at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2227) > 2022-08-08T03:27:22.2005059Z Aug 08 03:27:22 at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2178) > 2022-08-08T03:27:22.2005939Z Aug 08 03:27:22 at > org.apache.flink.streaming.api.datastream.DataStream.executeAndCollectWithClient(DataStream.java:1469) > 2022-08-08T03:27:22.2006735Z Aug 08 03:27:22 at > org.apache.flink.streaming.api.datastream.DataStream.executeAndCollect(DataStream.java:1334) > 2022-08-08T03:27:22.2007500Z Aug 08 03:27:22 at > org.apache.flink.streaming.api.datastream.DataStream.executeAndCollect(DataStream.java:1320) > 2022-08-08T03:27:22.2008315Z Aug 08 03:27:22 at > org.apache.flink.test.streaming.runtime.CacheITCase.testBatchProduceCacheStreamConsume(CacheITCase.java:190) > {code} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=39518&view=logs&j=a57e0635-3fad-5b08-57c7-a4142d7d6fa9&t=2ef0effc-1da1-50e5-c2bd-aab434b1c5b7 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-28529) ChangelogPeriodicMaterializationSwitchStateBackendITCase.testSwitchFromDisablingToEnablingInClaimMode failed with CheckpointException: Checkpoint expired before comple
[ https://issues.apache.org/jira/browse/FLINK-28529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17577082#comment-17577082 ] Huang Xingbo commented on FLINK-28529: -- https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=39675&view=logs&j=b0a398c0-685b-599c-eb57-c8c2a771138e&t=747432ad-a576-5911-1e2a-68c6bedc248a > ChangelogPeriodicMaterializationSwitchStateBackendITCase.testSwitchFromDisablingToEnablingInClaimMode > failed with CheckpointException: Checkpoint expired before completing > --- > > Key: FLINK-28529 > URL: https://issues.apache.org/jira/browse/FLINK-28529 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / State Backends >Affects Versions: 1.16.0 >Reporter: Huang Xingbo >Assignee: Yanfei Lei >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.16.0 > > > {code:java} > 2022-07-12T04:30:49.9912088Z Jul 12 04:30:49 [ERROR] > ChangelogPeriodicMaterializationSwitchStateBackendITCase.testSwitchFromDisablingToEnablingInClaimMode > Time elapsed: 617.048 s <<< ERROR! > 2022-07-12T04:30:49.9913108Z Jul 12 04:30:49 > java.util.concurrent.ExecutionException: > org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint expired > before completing. > 2022-07-12T04:30:49.9913880Z Jul 12 04:30:49 at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > 2022-07-12T04:30:49.9914606Z Jul 12 04:30:49 at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > 2022-07-12T04:30:49.9915572Z Jul 12 04:30:49 at > org.apache.flink.test.checkpointing.ChangelogPeriodicMaterializationSwitchStateBackendITCase.testSwitchFromDisablingToEnablingInClaimMode(ChangelogPeriodicMaterializationSwitchStateBackendITCase.java:125) > 2022-07-12T04:30:49.9916483Z Jul 12 04:30:49 at > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > 2022-07-12T04:30:49.9917377Z Jul 12 04:30:49 at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > 2022-07-12T04:30:49.9918121Z Jul 12 04:30:49 at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > 2022-07-12T04:30:49.9918788Z Jul 12 04:30:49 at > java.lang.reflect.Method.invoke(Method.java:498) > 2022-07-12T04:30:49.9919456Z Jul 12 04:30:49 at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > 2022-07-12T04:30:49.9920193Z Jul 12 04:30:49 at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > 2022-07-12T04:30:49.9920923Z Jul 12 04:30:49 at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > 2022-07-12T04:30:49.9921630Z Jul 12 04:30:49 at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > 2022-07-12T04:30:49.9922326Z Jul 12 04:30:49 at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > 2022-07-12T04:30:49.9923023Z Jul 12 04:30:49 at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > 2022-07-12T04:30:49.9923708Z Jul 12 04:30:49 at > org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) > 2022-07-12T04:30:49.9924449Z Jul 12 04:30:49 at > org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) > 2022-07-12T04:30:49.9925124Z Jul 12 04:30:49 at > org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) > 2022-07-12T04:30:49.9925912Z Jul 12 04:30:49 at > org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > 2022-07-12T04:30:49.9926742Z Jul 12 04:30:49 at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > 2022-07-12T04:30:49.9928142Z Jul 12 04:30:49 at > org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > 2022-07-12T04:30:49.9928715Z Jul 12 04:30:49 at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > 2022-07-12T04:30:49.9929311Z Jul 12 04:30:49 at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > 2022-07-12T04:30:49.9929863Z Jul 12 04:30:49 at > org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > 2022-07-12T04:30:49.9930376Z Jul 12 04:30:49 at > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > 2022-07-12T04:30:49.9930911Z Jul 12 04:30:49 at > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > 2022-07-12T04:30:49.9931441Z Jul 12 04:30:49 at > org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > 2022-07-12T04:30:49.9931975Z Jul 12 04:30:49 at
[jira] [Commented] (FLINK-28855) ThriftObjectConversions compile failed
[ https://issues.apache.org/jira/browse/FLINK-28855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17577080#comment-17577080 ] Huang Xingbo commented on FLINK-28855: -- https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=39675&view=logs&j=b1fcf054-9138-5463-c73c-a49979b9ac2a&t=9291ac46-dd95-5135-b799-3839e65a8691&l=7350 > ThriftObjectConversions compile failed > -- > > Key: FLINK-28855 > URL: https://issues.apache.org/jira/browse/FLINK-28855 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive, Table SQL / Gateway >Affects Versions: 1.16.0 >Reporter: Huang Xingbo >Assignee: yuzelin >Priority: Critical > Labels: test-stability > Fix For: 1.16.0 > > > {code:java} > 2022-08-08T00:32:45.5104326Z [ERROR] > /home/vsts/work/1/s/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java:[615,31] > cannot find symbol > 2022-08-08T00:32:45.5105191Z symbol: variable INDEX_TABLE > 2022-08-08T00:32:45.5107273Z location: class > org.apache.hadoop.hive.metastore.TableType > {code} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=39514&view=logs&j=87489130-75dc-54e4-1f45-80c30aa367a3&t=4632ba9d-f1f2-5ad2-13fc-828d0e28bac4 -- This message was sent by Atlassian Jira (v8.20.10#820010)