[jira] [Commented] (FLINK-30629) ClientHeartbeatTest.testJobRunningIfClientReportHeartbeat is unstable
[ https://issues.apache.org/jira/browse/FLINK-30629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726773#comment-17726773 ] Sergey Nuyanzin commented on FLINK-30629: - https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49421=logs=77a9d8e1-d610-59b3-fc2a-4766541e0e33=125e07e7-8de0-5c6c-a541-a567415af3ef=9992 > ClientHeartbeatTest.testJobRunningIfClientReportHeartbeat is unstable > - > > Key: FLINK-30629 > URL: https://issues.apache.org/jira/browse/FLINK-30629 > Project: Flink > Issue Type: Bug > Components: Client / Job Submission >Affects Versions: 1.17.0, 1.18.0 >Reporter: Xintong Song >Assignee: Weijie Guo >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.17.0 > > Attachments: ClientHeartbeatTestLog.txt > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=44690=logs=77a9d8e1-d610-59b3-fc2a-4766541e0e33=125e07e7-8de0-5c6c-a541-a567415af3ef=10819 > {code:java} > Jan 11 04:32:39 [ERROR] Tests run: 3, Failures: 0, Errors: 1, Skipped: 0, > Time elapsed: 21.02 s <<< FAILURE! - in > org.apache.flink.client.ClientHeartbeatTest > Jan 11 04:32:39 [ERROR] > org.apache.flink.client.ClientHeartbeatTest.testJobRunningIfClientReportHeartbeat > Time elapsed: 9.157 s <<< ERROR! > Jan 11 04:32:39 java.lang.IllegalStateException: MiniCluster is not yet > running or has already been shut down. > Jan 11 04:32:39 at > org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) > Jan 11 04:32:39 at > org.apache.flink.runtime.minicluster.MiniCluster.getDispatcherGatewayFuture(MiniCluster.java:1044) > Jan 11 04:32:39 at > org.apache.flink.runtime.minicluster.MiniCluster.runDispatcherCommand(MiniCluster.java:917) > Jan 11 04:32:39 at > org.apache.flink.runtime.minicluster.MiniCluster.getJobStatus(MiniCluster.java:841) > Jan 11 04:32:39 at > org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobStatus(MiniClusterJobClient.java:91) > Jan 11 04:32:39 at > org.apache.flink.client.ClientHeartbeatTest.testJobRunningIfClientReportHeartbeat(ClientHeartbeatTest.java:79) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32176) [CVE-2022-1471] Mitigate CVE from snakeyaml coming from pulsar-client-all
[ https://issues.apache.org/jira/browse/FLINK-32176?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Samrat Deb updated FLINK-32176: --- Attachment: (was: Screenshot 2023-05-27 at 9.00.53 AM.png) > [CVE-2022-1471] Mitigate CVE from snakeyaml coming from pulsar-client-all > - > > Key: FLINK-32176 > URL: https://issues.apache.org/jira/browse/FLINK-32176 > Project: Flink > Issue Type: Bug > Components: Connectors / Pulsar >Affects Versions: 1.17.0 >Reporter: Samrat Deb >Assignee: Samrat Deb >Priority: Major > Labels: pull-request-available > > > * *CVE ID:* {{CVE-2022-1471}} > * *CWE:* CWE-502 Deserialization of Untrusted Data > * {*}Severity{*}: Critical > {{pulsar-client-all-2.10.0.jar (shaded: org.yaml:snakeyaml:1.30)}} > > {{SnakeYaml's Constructor() class does not restrict types which can be > instantiated during deserialization. Deserializing yaml content provided by > an attacker can lead to remote code execution. We recommend using SnakeYaml's > SafeConsturctor when parsing untrusted content to restrict deserialization.}} > {{}} > {{More details : https://nvd.nist.gov/vuln/detail/CVE-2022-1471}} > {{{}{}}}{{{}{}}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-32176) [CVE-2022-1471] Mitigate CVE from snakeyaml coming from pulsar-client-all
[ https://issues.apache.org/jira/browse/FLINK-32176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726765#comment-17726765 ] Samrat Deb edited comment on FLINK-32176 at 5/27/23 3:31 AM: - Please help triggering the workflow for the PR to complete the tests which needs approval from maintainers/commiter. was (Author: samrat007): Please help triggering the workflow for the PR to complete which needs approval from maintainers/commiter. > [CVE-2022-1471] Mitigate CVE from snakeyaml coming from pulsar-client-all > - > > Key: FLINK-32176 > URL: https://issues.apache.org/jira/browse/FLINK-32176 > Project: Flink > Issue Type: Bug > Components: Connectors / Pulsar >Affects Versions: 1.17.0 >Reporter: Samrat Deb >Assignee: Samrat Deb >Priority: Major > Labels: pull-request-available > > > * *CVE ID:* {{CVE-2022-1471}} > * *CWE:* CWE-502 Deserialization of Untrusted Data > * {*}Severity{*}: Critical > {{pulsar-client-all-2.10.0.jar (shaded: org.yaml:snakeyaml:1.30)}} > > {{SnakeYaml's Constructor() class does not restrict types which can be > instantiated during deserialization. Deserializing yaml content provided by > an attacker can lead to remote code execution. We recommend using SnakeYaml's > SafeConsturctor when parsing untrusted content to restrict deserialization.}} > {{}} > {{More details : https://nvd.nist.gov/vuln/detail/CVE-2022-1471}} > {{{}{}}}{{{}{}}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-32176) [CVE-2022-1471] Mitigate CVE from snakeyaml coming from pulsar-client-all
[ https://issues.apache.org/jira/browse/FLINK-32176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726765#comment-17726765 ] Samrat Deb edited comment on FLINK-32176 at 5/27/23 3:31 AM: - Please help triggering the workflow for the PR to complete which needs approval from maintainers/commiter. was (Author: samrat007): Please help triggering the workflow for the PR to complete which needs approval from maintainers/commiter. > [CVE-2022-1471] Mitigate CVE from snakeyaml coming from pulsar-client-all > - > > Key: FLINK-32176 > URL: https://issues.apache.org/jira/browse/FLINK-32176 > Project: Flink > Issue Type: Bug > Components: Connectors / Pulsar >Affects Versions: 1.17.0 >Reporter: Samrat Deb >Assignee: Samrat Deb >Priority: Major > Labels: pull-request-available > > > * *CVE ID:* {{CVE-2022-1471}} > * *CWE:* CWE-502 Deserialization of Untrusted Data > * {*}Severity{*}: Critical > {{pulsar-client-all-2.10.0.jar (shaded: org.yaml:snakeyaml:1.30)}} > > {{SnakeYaml's Constructor() class does not restrict types which can be > instantiated during deserialization. Deserializing yaml content provided by > an attacker can lead to remote code execution. We recommend using SnakeYaml's > SafeConsturctor when parsing untrusted content to restrict deserialization.}} > {{}} > {{More details : https://nvd.nist.gov/vuln/detail/CVE-2022-1471}} > {{{}{}}}{{{}{}}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32176) [CVE-2022-1471] Mitigate CVE from snakeyaml coming from pulsar-client-all
[ https://issues.apache.org/jira/browse/FLINK-32176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726765#comment-17726765 ] Samrat Deb commented on FLINK-32176: Please help triggering the workflow for the PR to complete which needs approval from maintainers/commiter. > [CVE-2022-1471] Mitigate CVE from snakeyaml coming from pulsar-client-all > - > > Key: FLINK-32176 > URL: https://issues.apache.org/jira/browse/FLINK-32176 > Project: Flink > Issue Type: Bug > Components: Connectors / Pulsar >Affects Versions: 1.17.0 >Reporter: Samrat Deb >Assignee: Samrat Deb >Priority: Major > Labels: pull-request-available > > > * *CVE ID:* {{CVE-2022-1471}} > * *CWE:* CWE-502 Deserialization of Untrusted Data > * {*}Severity{*}: Critical > {{pulsar-client-all-2.10.0.jar (shaded: org.yaml:snakeyaml:1.30)}} > > {{SnakeYaml's Constructor() class does not restrict types which can be > instantiated during deserialization. Deserializing yaml content provided by > an attacker can lead to remote code execution. We recommend using SnakeYaml's > SafeConsturctor when parsing untrusted content to restrict deserialization.}} > {{}} > {{More details : https://nvd.nist.gov/vuln/detail/CVE-2022-1471}} > {{{}{}}}{{{}{}}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32137) Flame graph is hard to use with many task managers
[ https://issues.apache.org/jira/browse/FLINK-32137?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726760#comment-17726760 ] Vladimir Matveev commented on FLINK-32137: -- Hi [~fanrui], I need to clear this up with my management/open source people, but it is likely I will be able to contribute a fix here. I'll try to ping you here soon. > Flame graph is hard to use with many task managers > -- > > Key: FLINK-32137 > URL: https://issues.apache.org/jira/browse/FLINK-32137 > Project: Flink > Issue Type: Improvement > Components: Runtime / Web Frontend >Affects Versions: 1.16.1 >Reporter: Vladimir Matveev >Priority: Major > Attachments: image (1).png, image-2023-05-23-11-01-30-391.png > > > In case there are many task managers executing the same operator, the flame > graph becomes very hard to use. As you can see on the attached picture, it > considers instances of the same lambda function as different classes, and > their number seems to be equal to the number of task managers (i.e. each JVM > gets its own "class" name, which is expected for lambdas I guess). This > lambda function is deep within Flink's own call stack, so this kind of graph > is inevitable regardless of the job's own logic, and there is nothing we can > do at the job logic's level to fix it. > This behavior makes evaluating the flame graph very hard, because all of the > useful information gets "compressed" inside each "column" of the graph, and > at the same time, it does not give any useful information since this is just > an artifact of the class name generation in the JVM. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] flinkbot commented on pull request #22668: [BP-1.17][FLINK-28853] Document the split level watermark alignment feature an…
flinkbot commented on PR #22668: URL: https://github.com/apache/flink/pull/22668#issuecomment-1564848214 ## CI report: * 60a2646b4caec19e5a3a470f5d7528009d880246 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] mas-chen opened a new pull request, #22668: [BP-1.17][FLINK-28853] Document the split level watermark alignment feature an…
mas-chen opened a new pull request, #22668: URL: https://github.com/apache/flink/pull/22668 …d fixup grammar from the configuration table ## What is the purpose of the change Update docs for watermark alignment. ## Brief change log - Update docs for watermark alignment - Fixup grammar in Flink configuration ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (**yes** / no) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (**yes** / no) - If yes, how is the feature documented? (not applicable / **docs** / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-32206) ModuleNotFoundError for Pyflink.table.descriptors wheel version mismatch
[ https://issues.apache.org/jira/browse/FLINK-32206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alireza Omidvar updated FLINK-32206: Attachment: (was: image (1)-1.png) > ModuleNotFoundError for Pyflink.table.descriptors wheel version mismatch > > > Key: FLINK-32206 > URL: https://issues.apache.org/jira/browse/FLINK-32206 > Project: Flink > Issue Type: Bug > Components: API / Python, Connectors / Kafka, Connectors / MongoDB >Affects Versions: 1.17.0 >Reporter: Alireza Omidvar >Priority: Major > Attachments: image (1).png, image (2).png > > > Gentlemen, > I have problem with some apache-flink modules. I am running a 1.17.0 apache- > flink and I write test codes in Colab I faced a problem on importing Kafka, > Json and FileSystem modules > > from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime > from pyflink.table.catalog import FileSystem > > not working for me (python version 3.10) > > Any help is highly appreciated the strange is that other modules importing > fine. I checked with your Github but didn't find these on official version > too which means modules are not inside the descriptor.py in newer version. > > Please see the link below: > > [https://github.com/aomidvar/scrapper-price-comparison/blob/d8a10f74101bf96974e769813c33b83d7a71f02b/kafkaconsumer1.ipynb] > > > I am running a test after producing the stream > ([https://github.com/aomidvar/scrapper-price-comparison/blob/main/kafkaproducer1.ipynb]) > to Confluent server and I like to do a flink job but the above mentioned > modules are not found with the following links in collab: > > That is probably an easy fix bug. Only version of apache-flink now working on > colab is 1.17.0. I prefer 3.10 but installed a virtual python 3.8 env and > between different modules found out that Kafka and Json modules are not in > descriptors.py of version 1.17 Apache-flink default. But modules exist in > Apache-flink 1.13 version. > [https://colab.research.google.com/drive/1aHKv8WA6RA10zTdwdzUubB5K0anEmOws?usp=sharing] > [https://colab.research.google.com/drive/1eCHJlsb8AjdmJtPc95X3H4btmFVSoCL4?usp=sharing] > > I've got this error for Json, Kafka ... > --- > > ImportError Traceback (most recent call last) > in () 1 from pyflink.table import DataTypes > 2 from > pyflink.table.descriptors import Schema, Kafka, Json, Rowtime 3 from > pyflink.table.catalog import FileSystem ImportError: cannot import name > 'Kafka' from 'pyflink.table.descriptors' > (/usr/local/lib/python3.10/dist-packages/pyflink/table/descriptors.py) > > --- > > NOTE: If your import is failing due to a missing package, you can manually > install dependencies using either !pip or !apt. To view examples of > installing some common dependencies, click the "Open Examples" button below. > > --- > > I have doubt that if current error is related to a version and dependencies > then > > I have to ask the developer if I do this python 3.8 env is that possible to > get solved? > > > Thanks for your time , > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32206) ModuleNotFoundError for Pyflink.table.descriptors wheel version mismatch
[ https://issues.apache.org/jira/browse/FLINK-32206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alireza Omidvar updated FLINK-32206: Attachment: (was: image (2)-1.png) > ModuleNotFoundError for Pyflink.table.descriptors wheel version mismatch > > > Key: FLINK-32206 > URL: https://issues.apache.org/jira/browse/FLINK-32206 > Project: Flink > Issue Type: Bug > Components: API / Python, Connectors / Kafka, Connectors / MongoDB >Affects Versions: 1.17.0 >Reporter: Alireza Omidvar >Priority: Major > Attachments: image (1).png, image (2).png > > > Gentlemen, > I have problem with some apache-flink modules. I am running a 1.17.0 apache- > flink and I write test codes in Colab I faced a problem on importing Kafka, > Json and FileSystem modules > > from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime > from pyflink.table.catalog import FileSystem > > not working for me (python version 3.10) > > Any help is highly appreciated the strange is that other modules importing > fine. I checked with your Github but didn't find these on official version > too which means modules are not inside the descriptor.py in newer version. > > Please see the link below: > > [https://github.com/aomidvar/scrapper-price-comparison/blob/d8a10f74101bf96974e769813c33b83d7a71f02b/kafkaconsumer1.ipynb] > > > I am running a test after producing the stream > ([https://github.com/aomidvar/scrapper-price-comparison/blob/main/kafkaproducer1.ipynb]) > to Confluent server and I like to do a flink job but the above mentioned > modules are not found with the following links in collab: > > That is probably an easy fix bug. Only version of apache-flink now working on > colab is 1.17.0. I prefer 3.10 but installed a virtual python 3.8 env and > between different modules found out that Kafka and Json modules are not in > descriptors.py of version 1.17 Apache-flink default. But modules exist in > Apache-flink 1.13 version. > [https://colab.research.google.com/drive/1aHKv8WA6RA10zTdwdzUubB5K0anEmOws?usp=sharing] > [https://colab.research.google.com/drive/1eCHJlsb8AjdmJtPc95X3H4btmFVSoCL4?usp=sharing] > > I've got this error for Json, Kafka ... > --- > > ImportError Traceback (most recent call last) > in () 1 from pyflink.table import DataTypes > 2 from > pyflink.table.descriptors import Schema, Kafka, Json, Rowtime 3 from > pyflink.table.catalog import FileSystem ImportError: cannot import name > 'Kafka' from 'pyflink.table.descriptors' > (/usr/local/lib/python3.10/dist-packages/pyflink/table/descriptors.py) > > --- > > NOTE: If your import is failing due to a missing package, you can manually > install dependencies using either !pip or !apt. To view examples of > installing some common dependencies, click the "Open Examples" button below. > > --- > > I have doubt that if current error is related to a version and dependencies > then > > I have to ask the developer if I do this python 3.8 env is that possible to > get solved? > > > Thanks for your time , > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32206) ModuleNotFoundError for Pyflink.table.descriptors wheel version mismatch
[ https://issues.apache.org/jira/browse/FLINK-32206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alireza Omidvar updated FLINK-32206: Attachment: image (1)-1.png image (2)-1.png Component/s: Connectors / Kafka Connectors / MongoDB External issue URL: https://github.com/aomidvar/scrapper-price-comparison/blob/d8a10f74101bf96974e769813c33b83d7a71f02b/kafkaconsumer1.ipynb Language: Python Affects Version/s: 1.17.0 (was: mongodb-1.0.1) Description: Gentlemen, I have problem with some apache-flink modules. I am running a 1.17.0 apache- flink and I write test codes in Colab I faced a problem on importing Kafka, Json and FileSystem modules from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime from pyflink.table.catalog import FileSystem not working for me (python version 3.10) Any help is highly appreciated the strange is that other modules importing fine. I checked with your Github but didn't find these on official version too which means modules are not inside the descriptor.py in newer version. Please see the link below: [https://github.com/aomidvar/scrapper-price-comparison/blob/d8a10f74101bf96974e769813c33b83d7a71f02b/kafkaconsumer1.ipynb] I am running a test after producing the stream ([https://github.com/aomidvar/scrapper-price-comparison/blob/main/kafkaproducer1.ipynb]) to Confluent server and I like to do a flink job but the above mentioned modules are not found with the following links in collab: That is probably an easy fix bug. Only version of apache-flink now working on colab is 1.17.0. I prefer 3.10 but installed a virtual python 3.8 env and between different modules found out that Kafka and Json modules are not in descriptors.py of version 1.17 Apache-flink default. But modules exist in Apache-flink 1.13 version. [https://colab.research.google.com/drive/1aHKv8WA6RA10zTdwdzUubB5K0anEmOws?usp=sharing] [https://colab.research.google.com/drive/1eCHJlsb8AjdmJtPc95X3H4btmFVSoCL4?usp=sharing] I've got this error for Json, Kafka ... --- ImportError Traceback (most recent call last) in () 1 from pyflink.table import DataTypes > 2 from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime 3 from pyflink.table.catalog import FileSystem ImportError: cannot import name 'Kafka' from 'pyflink.table.descriptors' (/usr/local/lib/python3.10/dist-packages/pyflink/table/descriptors.py) --- NOTE: If your import is failing due to a missing package, you can manually install dependencies using either !pip or !apt. To view examples of installing some common dependencies, click the "Open Examples" button below. --- I have doubt that if current error is related to a version and dependencies then I have to ask the developer if I do this python 3.8 env is that possible to get solved? Thanks for your time , was: Gentlemen, I have problem with some apache-flink modules. I am running a 1.17.0 apache- flink and I write test codes in Colab I faced a problem for import modules from pyflink.table import DataTypes from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime from pyflink.table.catalog import FileSystem not working for me (python version 3.10) Any help is highly appreciated the strange is that other modules importing fine. I checked with your Github but didn't find these on yours too which means modules are not inside your descriptor.py too. I think it needed installation of connectors but it failed too. Please see the link below: [https://github.com/aomidvar/scrapper-price-comparison/blob/d8a10f74101bf96974e769813c33b83d7a71f02b/kafkaconsumer1.ipynb] I am running a test after producing the stream ([https://github.com/aomidvar/scrapper-price-comparison/blob/main/kafkaproducer1.ipynb]) to Confluent server and I like to do a flink job but the above mentioned modules are not found with the following links in collab: That is not probably a bug. Only version of apache-flink now working on colab is 1.17.0. I prefer 3.10 but installed a virtual python 3.8 env and between different modules found out that Kafka and Json modules are not in descriptors.py of version 1.17 Apache-flink default. But modules exist in Apache-flink 1.13 version. [https://colab.research.google.com/drive/1aHKv8WA6RA10zTdwdzUubB5K0anEmOws?usp=sharing] [https://colab.research.google.com/drive/1eCHJlsb8AjdmJtPc95X3H4btmFVSoCL4?usp=sharing] I've got this error for Json, Kafka ... ---
[GitHub] [flink] pgaref commented on pull request #22663: [FLINK-32198][runtime] Enforce single maxExceptions query parameter
pgaref commented on PR #22663: URL: https://github.com/apache/flink/pull/22663#issuecomment-1564774191 > This is an api-breaking change and can't be merged. Thanks for taking a look @zentol ! What the policy for making breaking changes? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] kurtostfeld commented on pull request #22660: [FLINK-3154][runtime] Upgrade from Kryo v2 + Chill 0.7.6 to Kryo v5 w…
kurtostfeld commented on PR #22660: URL: https://github.com/apache/flink/pull/22660#issuecomment-1564710396 @zentol ok, I created a Confluence login with username "kurto", however I don't seem to have permissions to create a new FLIP. I don't see a "Create" button as the docs say: > To create your own FLIP, click on "Create" on the header and choose "FLIP-Template" other than "Blank page". Thank you :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-3154) Update Kryo version from 2.24.0 to latest Kryo LTS version
[ https://issues.apache.org/jira/browse/FLINK-3154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726320#comment-17726320 ] Kurt Ostfeld edited comment on FLINK-3154 at 5/26/23 5:24 PM: -- [~martijnvisser] this PR upgrades Flink from Kryo v2.x to Kryo v5.x and preserves backward compatibility with existing savepoints and checkpoints: [https://github.com/apache/flink/pull/22660] This keeps the Kryo v2 project dependency for backwards compatibility only and otherwise uses Kryo v5.x. EDIT: All CI tests are passing. was (Author: JIRAUSER38): [~martijnvisser] this PR upgrades Flink from Kryo v2.x to Kryo v5.x and preserves backward compatibility with existing savepoints and checkpoints: [https://github.com/apache/flink/pull/22660] This keeps the Kryo v2 project dependency for backwards compatibility only and otherwise uses Kryo v5.x. EDIT: I will fix the CI errors. > Update Kryo version from 2.24.0 to latest Kryo LTS version > -- > > Key: FLINK-3154 > URL: https://issues.apache.org/jira/browse/FLINK-3154 > Project: Flink > Issue Type: Improvement > Components: API / Type Serialization System >Affects Versions: 1.0.0 >Reporter: Maximilian Michels >Priority: Not a Priority > Labels: pull-request-available > > Flink's Kryo version is outdated and could be updated to a newer version, > e.g. kryo-3.0.3. > From ML: we cannot bumping the Kryo version easily - the serialization format > changed (that's why they have a new major version), which would render all > Flink savepoints and checkpoints incompatible. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-32209) Opensearch connector should remove the dependency on flink-shaded
[ https://issues.apache.org/jira/browse/FLINK-32209?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andriy Redko resolved FLINK-32209. -- Resolution: Won't Do > Opensearch connector should remove the dependency on flink-shaded > - > > Key: FLINK-32209 > URL: https://issues.apache.org/jira/browse/FLINK-32209 > Project: Flink > Issue Type: Bug > Components: Connectors / Opensearch >Affects Versions: opensearch-1.0.1 >Reporter: Andriy Redko >Assignee: Andriy Redko >Priority: Major > Fix For: opensearch-1.1.0 > > > The Opensearch connector depends on flink-shaded. With the externalization of > the connector, the connectors shouldn't rely on Flink-Shaded -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32209) Opensearch connector should remove the dependency on flink-shaded
[ https://issues.apache.org/jira/browse/FLINK-32209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726691#comment-17726691 ] Andriy Redko commented on FLINK-32209: -- Already fixed by https://github.com/apache/flink-connector-opensearch/commit/85e9cad4f09519543e149530f8a61b2635ca506e > Opensearch connector should remove the dependency on flink-shaded > - > > Key: FLINK-32209 > URL: https://issues.apache.org/jira/browse/FLINK-32209 > Project: Flink > Issue Type: Bug > Components: Connectors / Opensearch >Affects Versions: opensearch-1.0.1 >Reporter: Andriy Redko >Assignee: Andriy Redko >Priority: Major > Fix For: opensearch-1.1.0 > > > The Opensearch connector depends on flink-shaded. With the externalization of > the connector, the connectors shouldn't rely on Flink-Shaded -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-connector-jdbc] ewangsdc commented on pull request #3: [FLINK-15462][connectors] Add Trino dialect
ewangsdc commented on PR #3: URL: https://github.com/apache/flink-connector-jdbc/pull/3#issuecomment-1564695132 If the new Flink JDBC connector Trino dialect supports this Trino JDBC driver, i.e., https://trino.io/docs/current/client/jdbc.html & https://github.com/trinodb/docs.trino.io, Kerberos auth options could be added as extra options in addition to the current ones, i.e., https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/jdbc/#connector-options:~:text=.id%3B-,Connector%20Options,-%23. Like Spark/PySpark session JDBC options for Trino Kerberos auth, the extra Flink JDBC connection options should include the Kerberos parameters s for the above Trino JDBC driver, e.g., 'kerberos_remote_service_name': "", 'kerbeors_principal': "", 'kerberos_usecanonicalhostname': , 'driver': 'io.trino.jdbc.TrinoDriver' Another option is to embed the above Kerberos parameters into the existing connection option url, i.e., JDBC database url, as one string, e.g, “jdbc:trino:///==False&..", and then Flink Trino dialect Java code should parse this url to get the Kerberos parameters and pass them to the above Trino JDBC driver . Thanks, Emerson -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-32209) Opensearch connector should remove the dependency on flink-shaded
[ https://issues.apache.org/jira/browse/FLINK-32209?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Nuyanzin reassigned FLINK-32209: --- Assignee: Andriy Redko > Opensearch connector should remove the dependency on flink-shaded > - > > Key: FLINK-32209 > URL: https://issues.apache.org/jira/browse/FLINK-32209 > Project: Flink > Issue Type: Bug > Components: Connectors / Opensearch >Affects Versions: opensearch-1.0.1 >Reporter: Andriy Redko >Assignee: Andriy Redko >Priority: Major > Fix For: opensearch-1.1.0 > > > The Opensearch connector depends on flink-shaded. With the externalization of > the connector, the connectors shouldn't rely on Flink-Shaded -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] reswqa commented on pull request #22666: Update japicmp configuration for 1.16.2
reswqa commented on PR #22666: URL: https://github.com/apache/flink/pull/22666#issuecomment-1564661314 I don't know if it was missed or just for reasons I'm not aware of, it seems that the release of `1.16.1` did not update `japicmp`. If I understand correctly, we also should update it for `bug-fix` release. For safety reasons, could you please help confirm this @MartijnVisser. Thanks a lot! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-31944) Protobuf format throw com.google.protobuf.InvalidProtocolBufferException
[ https://issues.apache.org/jira/browse/FLINK-31944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726681#comment-17726681 ] Ryan Skraba commented on FLINK-31944: - This is a duplicate of FLINK-32008 – one of the serialized messages is: {code:java} 0a 03 61 62 63 0a {code} (Note the first **0a** byte that isn't a newline, and the second **0a** byte that was added to the record and probably shouldn't be there.) > Protobuf format throw com.google.protobuf.InvalidProtocolBufferException > > > Key: FLINK-31944 > URL: https://issues.apache.org/jira/browse/FLINK-31944 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.17.0 >Reporter: Xuannan Su >Priority: Major > Attachments: flink-protobuf-example.zip > > > It seems that protobuf format throws the following exception when the first > field of the message is string type. This may also occur for other types. I > uploaded the maven project to reproduce the problem. > > {code:java} > Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received > unexpected exception while polling the records > at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165) > at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:114) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > ... 1 more > Caused by: java.io.IOException: Failed to deserialize PB object. > at > org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.deserialize(PbRowDataDeserializationSchema.java:75) > at > org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.deserialize(PbRowDataDeserializationSchema.java:42) > at > org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82) > at > org.apache.flink.connector.file.table.DeserializationSchemaAdapter$LineBytesInputFormat.readRecord(DeserializationSchemaAdapter.java:197) > at > org.apache.flink.connector.file.table.DeserializationSchemaAdapter$LineBytesInputFormat.nextRecord(DeserializationSchemaAdapter.java:210) > at > org.apache.flink.connector.file.table.DeserializationSchemaAdapter$Reader.readBatch(DeserializationSchemaAdapter.java:124) > at > org.apache.flink.connector.file.src.util.RecordMapperWrapperRecordIterator$1.readBatch(RecordMapperWrapperRecordIterator.java:82) > at > org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:67) > at > org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) > at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:162) > ... 6 more > Caused by: java.lang.reflect.InvocationTargetException > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter.convertProtoBinaryToRow(ProtoToRowConverter.java:129) > at > org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.deserialize(PbRowDataDeserializationSchema.java:70) > ... 15 more > Caused by: com.google.protobuf.InvalidProtocolBufferException: Protocol > message contained an invalid tag (zero). > at > com.google.protobuf.InvalidProtocolBufferException.invalidTag(InvalidProtocolBufferException.java:133) > at > com.google.protobuf.CodedInputStream$ArrayDecoder.readTag(CodedInputStream.java:633) > at com.example.proto.Message.(Message.java:47) > at com.example.proto.Message.(Message.java:9) > at com.example.proto.Message$1.parsePartialFrom(Message.java:540) > at com.example.proto.Message$1.parsePartialFrom(Message.java:534) > at > com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:158) > at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:191) > at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:203) > at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:208) > at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:48) > at
[jira] [Comment Edited] (FLINK-32008) Protobuf format cannot work with FileSystem Connector
[ https://issues.apache.org/jira/browse/FLINK-32008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726567#comment-17726567 ] Ryan Skraba edited comment on FLINK-32008 at 5/26/23 4:40 PM: -- It's probably worth checking in with the community to see what we expect of a bulk format, or if it would be an interesting thing to add! [I'll ask on the mailing list|https://lists.apache.org/thread/z9tqdqrhj12c17wqsdbm5fhzonqq5kp0]. I took a quick look but didn't see any related JIRA (outside of FLINK-12149, which proposes using the protobuf API to interact with parquet files). Can a committer change the title of this JIRA to better reflect the issue? Something like "Protobuf format on filesystem is faulty" was (Author: ryanskraba): It's probably worth checking in with the community to see what we expect of a bulk format, or if it would be an interesting thing to add! I'll ask on the mailing list. I took a quick look but didn't see any related JIRA (outside of FLINK-12149, which proposes using the protobuf API to interact with parquet files). Can a committer change the title of this JIRA to better reflect the issue? Something like "Protobuf format on filesystem is faulty" > Protobuf format cannot work with FileSystem Connector > - > > Key: FLINK-32008 > URL: https://issues.apache.org/jira/browse/FLINK-32008 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.17.0 >Reporter: Xuannan Su >Priority: Major > Attachments: flink-protobuf-example.zip > > > The protobuf format throws exception when working with Map data type. I > uploaded a example project to reproduce the problem. > > {code:java} > Caused by: java.lang.RuntimeException: One or more fetchers have encountered > exception > at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:261) > at > org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169) > at > org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:131) > at > org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:417) > at > org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received > unexpected exception while polling the records > at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165) > at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:114) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > ... 1 more > Caused by: java.io.IOException: Failed to deserialize PB object. > at > org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.deserialize(PbRowDataDeserializationSchema.java:75) > at > org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.deserialize(PbRowDataDeserializationSchema.java:42) > at > org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82) > at > org.apache.flink.connector.file.table.DeserializationSchemaAdapter$LineBytesInputFormat.readRecord(DeserializationSchemaAdapter.java:197) > at > org.apache.flink.connector.file.table.DeserializationSchemaAdapter$LineBytesInputFormat.nextRecord(DeserializationSchemaAdapter.java:210) > at >
[jira] [Created] (FLINK-32209) Opensearch connector should remove the dependency on flink-shaded
Andriy Redko created FLINK-32209: Summary: Opensearch connector should remove the dependency on flink-shaded Key: FLINK-32209 URL: https://issues.apache.org/jira/browse/FLINK-32209 Project: Flink Issue Type: Bug Components: Connectors / Opensearch Affects Versions: opensearch-1.0.1 Reporter: Andriy Redko Fix For: opensearch-1.1.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32209) Opensearch connector should remove the dependency on flink-shaded
[ https://issues.apache.org/jira/browse/FLINK-32209?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andriy Redko updated FLINK-32209: - Description: The Opensearch connector depends on flink-shaded. With the externalization of the connector, the connectors shouldn't rely on Flink-Shaded > Opensearch connector should remove the dependency on flink-shaded > - > > Key: FLINK-32209 > URL: https://issues.apache.org/jira/browse/FLINK-32209 > Project: Flink > Issue Type: Bug > Components: Connectors / Opensearch >Affects Versions: opensearch-1.0.1 >Reporter: Andriy Redko >Priority: Major > Fix For: opensearch-1.1.0 > > > The Opensearch connector depends on flink-shaded. With the externalization of > the connector, the connectors shouldn't rely on Flink-Shaded -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32200) OrcFileSystemITCase cashed with exit code 239 (NoClassDefFoundError: scala/concurrent/duration/Deadline)
[ https://issues.apache.org/jira/browse/FLINK-32200?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Nuyanzin updated FLINK-32200: Labels: test-stability (was: ) > OrcFileSystemITCase cashed with exit code 239 (NoClassDefFoundError: > scala/concurrent/duration/Deadline) > > > Key: FLINK-32200 > URL: https://issues.apache.org/jira/browse/FLINK-32200 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Affects Versions: 1.18.0 >Reporter: Matthias Pohl >Priority: Critical > Labels: test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49325=logs=4eda0b4a-bd0d-521a-0916-8285b9be9bb5=2ff6d5fa-53a6-53ac-bff7-fa524ea361a9=12302 > {code} > 12:24:14,883 [flink-akka.actor.internal-dispatcher-2] ERROR > org.apache.flink.util.FatalExitExceptionHandler [] - FATAL: > Thread 'flink-akka.actor.internal-dispatcher-2' produced an uncaught > exception. Stopping the process... > java.lang.NoClassDefFoundError: scala/concurrent/duration/Deadline > at scala.concurrent.duration.Deadline$.apply(Deadline.scala:30) > ~[flink-rpc-akka_318674dc-e98c-4e16-8705-faefda52bd1a.jar:1.18-SNAPSHOT] > at scala.concurrent.duration.Deadline$.now(Deadline.scala:76) > ~[flink-rpc-akka_318674dc-e98c-4e16-8705-faefda52bd1a.jar:1.18-SNAPSHOT] > at > akka.actor.CoordinatedShutdown.loop$1(CoordinatedShutdown.scala:737) > ~[flink-rpc-akka_318674dc-e98c-4e16-8705-faefda52bd1a.jar:1.18-SNAPSHOT] > at > akka.actor.CoordinatedShutdown.$anonfun$run$7(CoordinatedShutdown.scala:762) > ~[flink-rpc-akka_318674dc-e98c-4e16-8705-faefda52bd1a.jar:1.18-SNAPSHOT] > at scala.concurrent.Future.$anonfun$flatMap$1(Future.scala:307) > ~[flink-rpc-akka_318674dc-e98c-4e16-8705-faefda52bd1a.jar:1.18-SNAPSHOT] > at > scala.concurrent.impl.Promise.$anonfun$transformWith$1(Promise.scala:41) > ~[flink-rpc-akka_318674dc-e98c-4e16-8705-faefda52bd1a.jar:1.18-SNAPSHOT] > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) > ~[flink-rpc-akka_318674dc-e98c-4e16-8705-faefda52bd1a.jar:1.18-SNAPSHOT] > at > akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63) > ~[flink-rpc-akka_318674dc-e98c-4e16-8705-faefda52bd1a.jar:1.18-SNAPSHOT] > at > akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100) > ~[flink-rpc-akka_318674dc-e98c-4e16-8705-faefda52bd1a.jar:1.18-SNAPSHOT] > at > scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) > ~[flink-rpc-akka_318674dc-e98c-4e16-8705-faefda52bd1a.jar:1.18-SNAPSHOT] > at > scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85) > ~[flink-rpc-akka_318674dc-e98c-4e16-8705-faefda52bd1a.jar:1.18-SNAPSHOT] > at > akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100) > ~[flink-rpc-akka_318674dc-e98c-4e16-8705-faefda52bd1a.jar:1.18-SNAPSHOT] > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49) > ~[flink-rpc-akka_318674dc-e98c-4e16-8705-faefda52bd1a.jar:1.18-SNAPSHOT] > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48) > [flink-rpc-akka_318674dc-e98c-4e16-8705-faefda52bd1a.jar:1.18-SNAPSHOT] > at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) > [?:1.8.0_292] > at > java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) > [?:1.8.0_292] > at > java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) > [?:1.8.0_292] > at > java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) > [?:1.8.0_292] > 12:24:14,882 [flink-metrics-akka.actor.internal-dispatcher-2] ERROR > org.apache.flink.util.FatalExitExceptionHandler [] - FATAL: > Thread 'flink-metrics-akka.actor.internal-dispatcher-2' produced an uncaught > exception. Stopping the process... > java.lang.NoClassDefFoundError: scala/concurrent/duration/Deadline > at scala.concurrent.duration.Deadline$.apply(Deadline.scala:30) > ~[flink-rpc-akka_318674dc-e98c-4e16-8705-faefda52bd1a.jar:1.18-SNAPSHOT] > at scala.concurrent.duration.Deadline$.now(Deadline.scala:76) > ~[flink-rpc-akka_318674dc-e98c-4e16-8705-faefda52bd1a.jar:1.18-SNAPSHOT] > at > akka.actor.CoordinatedShutdown.loop$1(CoordinatedShutdown.scala:737) > ~[flink-rpc-akka_318674dc-e98c-4e16-8705-faefda52bd1a.jar:1.18-SNAPSHOT] > at > akka.actor.CoordinatedShutdown.$anonfun$run$7(CoordinatedShutdown.scala:762) > ~[flink-rpc-akka_318674dc-e98c-4e16-8705-faefda52bd1a.jar:1.18-SNAPSHOT] > at
[GitHub] [flink-connector-aws] snuyanzin commented on pull request #75: [FLINK-32208] Remove dependency on flink-shaded from flink-connector-aws
snuyanzin commented on PR #75: URL: https://github.com/apache/flink-connector-aws/pull/75#issuecomment-1564624709 hm... that's weird... almost same command passes locally ``` mvn clean install -DskipTests -Dflink.convergence.phase=install -Pcheck-convergence -U -B \ -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=120 \ -DaltDeploymentRepository=validation_repository::default::file:/tmp/flink-validation-deployment \ -Dflink.version=1.16.1 ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] WencongLiu commented on a diff in pull request #22579: [FLINK-32059] Migrate subclasses of BatchAbstractTestBase in batch.sql.agg and batch.sql.join to JUnit5
WencongLiu commented on code in PR #22579: URL: https://github.com/apache/flink/pull/22579#discussion_r1206929777 ## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/JoinITCase.scala: ## @@ -1333,7 +1335,7 @@ class JoinITCase(expectedJoinType: JoinType) extends BatchTestBase { } object JoinITCase { - @Parameterized.Parameters(name = "{0}") + @Parameters(name = "{0}") Review Comment: Sorry for the misunderstanding. Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-32206) Error ModuleNotFoundError for Pyflink.table.descriptors and wheel version mismatch
[ https://issues.apache.org/jira/browse/FLINK-32206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alireza Omidvar updated FLINK-32206: Attachment: image (2).png image (1).png > Error ModuleNotFoundError for Pyflink.table.descriptors and wheel version > mismatch > -- > > Key: FLINK-32206 > URL: https://issues.apache.org/jira/browse/FLINK-32206 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: mongodb-1.0.1 >Reporter: Alireza Omidvar >Priority: Major > Attachments: image (1).png, image (2).png > > > Gentlemen, > > > > I have problem with some apache-flink modules. I am running a 1.17.0 apache- > flink and I write test codes in Colab I faced a problem for import modules > > > > > from pyflink.table import DataTypes > > from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime > > from pyflink.table.catalog import FileSystem > > > > > not working for me (python version 3.10) > > > Any help is highly appreciated the strange is that other modules importing > fine. I checked with your Github but didn't find these on yours too which > means modules are not inside your descriptor.py too. I think it needed > installation of connectors but it failed too. > > > > > Please see the link below: > > > > > > [https://github.com/aomidvar/scrapper-price-comparison/blob/d8a10f74101bf96974e769813c33b83d7a71f02b/kafkaconsumer1.ipynb] > > > > > I am running a test after producing the stream > ([https://github.com/aomidvar/scrapper-price-comparison/blob/main/kafkaproducer1.ipynb]) > to Confluent server and I like to do a flink job but the above mentioned > modules are not found with the following links in collab: > > > That is not probably a bug. Only version of apache-flink now working on colab > is 1.17.0. I prefer 3.10 but installed a virtual python 3.8 env and between > different modules found out that Kafka and Json modules are not in > descriptors.py of version 1.17 Apache-flink default. But modules exist in > Apache-flink 1.13 version. > [https://colab.research.google.com/drive/1aHKv8WA6RA10zTdwdzUubB5K0anEmOws?usp=sharing] > [https://colab.research.google.com/drive/1eCHJlsb8AjdmJtPc95X3H4btmFVSoCL4?usp=sharing] > > I've got this error for Json, Kafka ... > --- > > ImportError Traceback (most recent call last) > in () 1 from pyflink.table import DataTypes > 2 from > pyflink.table.descriptors import Schema, Kafka, Json, Rowtime 3 from > pyflink.table.catalog import FileSystem ImportError: cannot import name > 'Kafka' from 'pyflink.table.descriptors' > (/usr/local/lib/python3.10/dist-packages/pyflink/table/descriptors.py) > > --- > > NOTE: If your import is failing due to a missing package, you can manually > install dependencies using either !pip or !apt. To view examples of > installing some common dependencies, click the "Open Examples" button below. > > --- > > I have doubt that if current error is related to a version and dependencies > then > > I have to ask the developer if I do this python 3.8 env is that possible to > get solved? > > > Thanks for your time , > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32208) Remove dependency on flink-shaded from flink-connector-aws
[ https://issues.apache.org/jira/browse/FLINK-32208?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-32208: --- Labels: pull-request-available (was: ) > Remove dependency on flink-shaded from flink-connector-aws > -- > > Key: FLINK-32208 > URL: https://issues.apache.org/jira/browse/FLINK-32208 > Project: Flink > Issue Type: Technical Debt > Components: Connectors / AWS, Connectors / DynamoDB >Affects Versions: aws-connector-4.2.0 >Reporter: Sergey Nuyanzin >Assignee: Sergey Nuyanzin >Priority: Major > Labels: pull-request-available > > The AWS connectors depend on flink-shaded. With the externalization of > connector, connectors shouldn't rely on Flink-Shaded but instead shade > dependencies such as this one themselves -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-connector-aws] boring-cyborg[bot] commented on pull request #75: [FLINK-32208] Remove dependency on flink-shaded from flink-connector-aws
boring-cyborg[bot] commented on PR #75: URL: https://github.com/apache/flink-connector-aws/pull/75#issuecomment-1564515776 Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-32206) Error ModuleNotFoundError for Pyflink.table.descriptors and wheel version mismatch
[ https://issues.apache.org/jira/browse/FLINK-32206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alireza Omidvar updated FLINK-32206: Description: Gentlemen, I have problem with some apache-flink modules. I am running a 1.17.0 apache- flink and I write test codes in Colab I faced a problem for import modules from pyflink.table import DataTypes from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime from pyflink.table.catalog import FileSystem not working for me (python version 3.10) Any help is highly appreciated the strange is that other modules importing fine. I checked with your Github but didn't find these on yours too which means modules are not inside your descriptor.py too. I think it needed installation of connectors but it failed too. Please see the link below: [https://github.com/aomidvar/scrapper-price-comparison/blob/d8a10f74101bf96974e769813c33b83d7a71f02b/kafkaconsumer1.ipynb] I am running a test after producing the stream ([https://github.com/aomidvar/scrapper-price-comparison/blob/main/kafkaproducer1.ipynb]) to Confluent server and I like to do a flink job but the above mentioned modules are not found with the following links in collab: That is not probably a bug. Only version of apache-flink now working on colab is 1.17.0. I prefer 3.10 but installed a virtual python 3.8 env and between different modules found out that Kafka and Json modules are not in descriptors.py of version 1.17 Apache-flink default. But modules exist in Apache-flink 1.13 version. [https://colab.research.google.com/drive/1aHKv8WA6RA10zTdwdzUubB5K0anEmOws?usp=sharing] [https://colab.research.google.com/drive/1eCHJlsb8AjdmJtPc95X3H4btmFVSoCL4?usp=sharing] I've got this error for Json, Kafka ... --- ImportError Traceback (most recent call last) in () 1 from pyflink.table import DataTypes > 2 from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime 3 from pyflink.table.catalog import FileSystem ImportError: cannot import name 'Kafka' from 'pyflink.table.descriptors' (/usr/local/lib/python3.10/dist-packages/pyflink/table/descriptors.py) --- NOTE: If your import is failing due to a missing package, you can manually install dependencies using either !pip or !apt. To view examples of installing some common dependencies, click the "Open Examples" button below. --- I have doubt that if current error is related to a version and dependencies then I have to ask the developer if I do this python 3.8 env is that possible to get solved? Thanks for your time , > Error ModuleNotFoundError for Pyflink.table.descriptors and wheel version > mismatch > -- > > Key: FLINK-32206 > URL: https://issues.apache.org/jira/browse/FLINK-32206 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: mongodb-1.0.1 >Reporter: Alireza Omidvar >Priority: Major > > Gentlemen, > > > > I have problem with some apache-flink modules. I am running a 1.17.0 apache- > flink and I write test codes in Colab I faced a problem for import modules > > > > > from pyflink.table import DataTypes > > from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime > > from pyflink.table.catalog import FileSystem > > > > > not working for me (python version 3.10) > > > Any help is highly appreciated the strange is that other modules importing > fine. I checked with your Github but didn't find these on yours too which > means modules are not inside your descriptor.py too. I think it needed > installation of connectors but it failed too. > > > > > Please see the link below: > > > > > > [https://github.com/aomidvar/scrapper-price-comparison/blob/d8a10f74101bf96974e769813c33b83d7a71f02b/kafkaconsumer1.ipynb] > > > > > I am running a test after producing the stream > ([https://github.com/aomidvar/scrapper-price-comparison/blob/main/kafkaproducer1.ipynb]) > to Confluent server and I like to do a flink job but the above mentioned > modules are not found with the following links in collab: > > > That is not probably a bug. Only version of apache-flink now working on colab > is 1.17.0. I prefer 3.10 but installed a virtual python 3.8 env and between > different modules found out that Kafka and Json modules are not in > descriptors.py of version 1.17 Apache-flink default. But modules exist in > Apache-flink 1.13 version. >
[jira] [Created] (FLINK-32208) Remove dependency on flink-shaded from flink-connector-aws
Sergey Nuyanzin created FLINK-32208: --- Summary: Remove dependency on flink-shaded from flink-connector-aws Key: FLINK-32208 URL: https://issues.apache.org/jira/browse/FLINK-32208 Project: Flink Issue Type: Technical Debt Components: Connectors / AWS, Connectors / DynamoDB Affects Versions: aws-connector-4.2.0 Reporter: Sergey Nuyanzin Assignee: Sergey Nuyanzin The AWS connectors depend on flink-shaded. With the externalization of connector, connectors shouldn't rely on Flink-Shaded but instead shade dependencies such as this one themselves -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] mxm merged pull request #22531: [FLINK-28853] Document the split level watermark alignment feature an…
mxm merged PR #22531: URL: https://github.com/apache/flink/pull/22531 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a diff in pull request #22010: [FLINK-31192][connectors/dataGen] Fix dataGen takes too long to initi…
XComp commented on code in PR #22010: URL: https://github.com/apache/flink/pull/22010#discussion_r1206745426 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java: ## @@ -65,33 +71,46 @@ public void open( this.checkpointedState == null, "The " + getClass().getSimpleName() + " has already been initialized."); -this.checkpointedState = -context.getOperatorStateStore() -.getListState( -new ListStateDescriptor<>( -name + "-sequence-state", LongSerializer.INSTANCE)); -this.valuesToEmit = new ArrayDeque<>(); -if (context.isRestored()) { -// upon restoring +ListStateDescriptor stateDescriptor = +new ListStateDescriptor<>( +name + "-sequence-state", TypeInformation.of(InternalState.class)); +this.checkpointedState = context.getOperatorStateStore().getListState(stateDescriptor); +this.internalStates = Lists.newArrayList(); -for (Long v : this.checkpointedState.get()) { -this.valuesToEmit.add(v); -} +if (context.isRestored()) { +checkpointedState.get().forEach(state -> internalStates.add(state)); } else { -// the first time the job is executed -final int stepSize = runtimeContext.getNumberOfParallelSubtasks(); +// The first time the job is executed. final int taskIdx = runtimeContext.getIndexOfThisSubtask(); -final long congruence = start + taskIdx; - -long totalNoOfElements = Math.abs(end - start + 1); -final int baseSize = safeDivide(totalNoOfElements, stepSize); -final int toCollect = -(totalNoOfElements % stepSize > taskIdx) ? baseSize + 1 : baseSize; +final long stepSize = runtimeContext.getNumberOfParallelSubtasks(); +InternalState state = new InternalState(taskIdx, stepSize, start + taskIdx); +internalStates.add(state); +} +} -for (long collected = 0; collected < toCollect; collected++) { -this.valuesToEmit.add(collected * stepSize + congruence); +public Long nextValue() { +Iterator iterator = internalStates.iterator(); Review Comment: Thanks for sharing your view, @RyanSkraba. I think you're right - the implementation in master (i.e. without this PR's change) doesn't support reducing the parallelism properly, too. > Avoid performance skew after reducing parallelism. Currently, going from N tasks to N-1 tasks will double the length of time it takes, since one task will have twice as many values to emit as the others. This is a undesirable consequence of this PR, but can probably also be discussed and implemented in a subsequent step. I don't understand that part: Isn't the performance skew issue you're describing more general and not rely connected to the `DataGenerator`? The skew you're mentioning is happening in any case when reducing the parallelism. It's more of a question how Flink would distribute the operator state after the parallelism was reduced. :thinking: Therefore, I don't see this as a problem for this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a diff in pull request #22341: [FLINK-27204] Refract FileSystemJobResultStore to execute I/O operations on the ioExecutor
XComp commented on code in PR #22341: URL: https://github.com/apache/flink/pull/22341#discussion_r1206775003 ## flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/JobResultStore.java: ## @@ -43,69 +44,69 @@ public interface JobResultStore { * Registers the passed {@link JobResultEntry} instance as {@code dirty} which indicates that * clean-up operations still need to be performed. Once the job resource cleanup has been * finalized, we can mark the {@code JobResultEntry} as {@code clean} result using {@link - * #markResultAsClean(JobID)}. + * #markResultAsCleanAsync(JobID)}. * * @param jobResultEntry The job result we wish to persist. + * @return CompletableFuture with the completed state. * @throws IOException if the creation of the dirty result failed for IO reasons. * @throws IllegalStateException if the passed {@code jobResultEntry} has a {@code JobID} * attached that is already registered in this {@code JobResultStore}. */ -void createDirtyResult(JobResultEntry jobResultEntry) throws IOException, IllegalStateException; +CompletableFuture createDirtyResultAsync(JobResultEntry jobResultEntry); /** * Marks an existing {@link JobResultEntry} as {@code clean}. This indicates that no more * resource cleanup steps need to be performed. No actions should be triggered if the passed * {@code JobID} belongs to a job that was already marked as clean. * * @param jobId Ident of the job we wish to mark as clean. + * @return CompletableFuture with the completed state. * @throws IOException if marking the {@code dirty} {@code JobResultEntry} as {@code clean} * failed for IO reasons. * @throws NoSuchElementException if there is no corresponding {@code dirty} job present in the * store for the given {@code JobID}. Review Comment: The `IOException` can be removed from the javaDoc now. It's going to be wrapped by the `CompletableFuture`. But we might want to mentioned the `NoSuchElementException` as part of the return value's documentation, still. ## flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/JobResultStore.java: ## @@ -43,69 +44,69 @@ public interface JobResultStore { * Registers the passed {@link JobResultEntry} instance as {@code dirty} which indicates that * clean-up operations still need to be performed. Once the job resource cleanup has been * finalized, we can mark the {@code JobResultEntry} as {@code clean} result using {@link - * #markResultAsClean(JobID)}. + * #markResultAsCleanAsync(JobID)}. * * @param jobResultEntry The job result we wish to persist. + * @return CompletableFuture with the completed state. * @throws IOException if the creation of the dirty result failed for IO reasons. * @throws IllegalStateException if the passed {@code jobResultEntry} has a {@code JobID} * attached that is already registered in this {@code JobResultStore}. Review Comment: The IOException can be removed from the javaDoc now. It's going to be wrapped by the CompletableFuture. But we might want to mentioned the `IllegalStateException` as part of the return value's documentation, still. ## flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractThreadsafeJobResultStore.java: ## @@ -44,64 +45,84 @@ public abstract class AbstractThreadsafeJobResultStore implements JobResultStore private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); @Override -public void createDirtyResult(JobResultEntry jobResultEntry) throws IOException { -Preconditions.checkState( -!hasJobResultEntry(jobResultEntry.getJobId()), -"Job result store already contains an entry for job %s", -jobResultEntry.getJobId()); - -withWriteLock(() -> createDirtyResultInternal(jobResultEntry)); +public CompletableFuture createDirtyResultAsync(JobResultEntry jobResultEntry) { +return hasJobResultEntryAsync(jobResultEntry.getJobId()) +.handle( +(hasResult, error) -> { +if (error != null || hasResult) { +ExceptionUtils.rethrow(error); +} +try { +withWriteLock(() -> createDirtyResultInternal(jobResultEntry)); Review Comment: You have to be careful here: The internal methods are now only triggering the actual logic in the `ioExecutor`. Therefore, only submitting the task is guarded under the lock. You're losing the synchronization of the IO tasks which will run in multiple threads of the ioExecutor for the `FileSystemJobResultStore` implementation. ##
[jira] [Updated] (FLINK-32203) Potential ClassLoader memory leak due to log4j configuration
[ https://issues.apache.org/jira/browse/FLINK-32203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Oleksandr Nitavskyi updated FLINK-32203: Priority: Minor (was: Major) > Potential ClassLoader memory leak due to log4j configuration > > > Key: FLINK-32203 > URL: https://issues.apache.org/jira/browse/FLINK-32203 > Project: Flink > Issue Type: Bug >Reporter: Oleksandr Nitavskyi >Priority: Minor > Labels: pull-request-available > Attachments: classloader_leak.png, > stack_trace_example_with_log4j_creation_on_job_reload.log > > > *Context* > We have encountered a memory leak related to ClassLoaders in Apache Flink. > ChildFirstClassLoader is not properly garbage collected, when job is being > restarted. > Heap Dump has shown that Log4j starts a configuration watch thread, which > then has Strong reference to ChildFirstClassLoader via AccessControlContext. > Since thread is never stopped, ChildFirstClassLoader is never cleaned. > Removal monitorInterval introduced in FLINK-20510 helps to mitigate the > issue, I believe it could be applied to log4j config by default. > *How to reproduce* > Deploy Flink job, which uses Hadoop File System (e.g. s3a). Redeploy the job > -> in Task Manager dump you should see multiple Log4jThreads > *AC* > We have a configuration which doesn't lead easy to memory leak with default > configuration for Flink users. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] echauchot commented on pull request #22667: [FLINK-31806] Fix public API rule for connectors
echauchot commented on PR #22667: URL: https://github.com/apache/flink/pull/22667#issuecomment-1564460268 Tested it on Cassandra connector and it reports these violations: 1. If I put back IOUtils use in the connector it raises `Method calls method in (CassandraEnumeratorStateSerializer.java:90) ` => This is what we wanted 2. On current code base, it raises: `java.lang.AssertionError: Architecture Violation [Priority: MEDIUM] - Rule 'Connector production code must not depend on non-public API outside of connector packages' was violated (16 times): Constructor (org.apache.flink.streaming.connectors.cassandra.ClusterBuilder, long, java.lang.Class, java.lang.String, org.apache.flink.streaming.connectors.cassandra.MapperOptions)> calls method in (CassandraSource.java:138) Constructor (org.apache.flink.streaming.connectors.cassandra.ClusterBuilder, long, java.lang.Class, java.lang.String, org.apache.flink.streaming.connectors.cassandra.MapperOptions)> calls method in (CassandraSource.java:124) Constructor (org.apache.flink.streaming.connectors.cassandra.ClusterBuilder, long, java.lang.Class, java.lang.String, org.apache.flink.streaming.connectors.cassandra.MapperOptions)> calls method in (CassandraSource.java:125) Constructor (org.apache.flink.streaming.connectors.cassandra.ClusterBuilder, long, java.lang.Class, java.lang.String, org.apache.flink.streaming.connectors.cassandra.MapperOptions)> calls method in (CassandraSource.java:126) Constructor (org.apache.flink.streaming.connectors.cassandra.ClusterBuilder, long, java.lang.Class, java.lang.String, org.apache.flink.streaming.connectors.cassandra.MapperOptions)> calls method in (CassandraSource.java:127) Constructor (org.apache.flink.streaming.connectors.cassandra.ClusterBuilder, long, java.lang.Class, java.lang.String, org.apache.flink.streaming.connectors.cassandra.MapperOptions)> gets field in (CassandraSource.java:138) Field has generic type > with type argument depending on in (CassandraSplitSerializer.java:0) Method calls method in (CassandraSource.java:145) Method calls method in (CassandraSource.java:149) Method is annotated with in (CassandraSource.java:0) Method is annotated with in (CassandraSplitReader.java:0) Method calls constructor ([B)> in (CassandraSplitSerializer.java:57) Method calls method in (CassandraSplitSerializer.java:51) Method calls method in (CassandraSplitSerializer.java:50) Method is annotated with in (SplitsGenerator.java:0) Static Initializer ()> calls constructor (int)> in (CassandraSplitSerializer.java:34) ` => These are calls to ClosureCleaner, checkState and checkNotNull, VisibleForTesting which should be added as violation exceptions => The only discutable part is the dependency on `org.apache.flink.core.memory.DataInputDeserializer` and `org.apache.flink.core.memory.DataOutputSerializer` which are internal APIs but we could definitely use [ByteArray | Object]Streams JDK counterparts instead in the Cassandra code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-32207) Error ModuleNotFoundError for Pyflink.table.descriptors and wheel version mismatch
Alireza Omidvar created FLINK-32207: --- Summary: Error ModuleNotFoundError for Pyflink.table.descriptors and wheel version mismatch Key: FLINK-32207 URL: https://issues.apache.org/jira/browse/FLINK-32207 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.17.0 Environment: Colab Python3.10 Reporter: Alireza Omidvar Attachments: image (1).png, image (2).png Gentlemen, I have problem with some apache-flink modules. I am running a 1.17.0 apache- flink and I write test codes in Colab I faced a problem for import modules from pyflink.table import DataTypes from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime from pyflink.table.catalog import FileSystem not working for me (python version 3.10) Any help is highly appreciated the strange is that other modules importing fine. I checked with your Github but didn't find these on yours too which means modules are not inside your descriptor.py too. I think it needed installation of connectors but it failed too. Please see the link below: [https://github.com/aomidvar/scrapper-price-comparison/blob/d8a10f74101bf96974e769813c33b83d7a71f02b/kafkaconsumer1.ipynb] I am running a test after producing the stream ([https://github.com/aomidvar/scrapper-price-comparison/blob/main/kafkaproducer1.ipynb]) to Confluent server and I like to do a flink job but the above mentioned modules are not found with the following links in collab: That is not probably a bug. Only version of apache-flink now working on colab is 1.17.0. I prefer 3.10 but installed a virtual python 3.8 env and between different modules found out that Kafka and Json modules are not in descriptors.py of version 1.17 Apache-flink default. But modules exist in Apache-flink 1.13 version. [https://colab.research.google.com/drive/1aHKv8WA6RA10zTdwdzUubB5K0anEmOws?usp=sharing] [https://colab.research.google.com/drive/1eCHJlsb8AjdmJtPc95X3H4btmFVSoCL4?usp=sharing] I've got this error for Json, Kafka ... --- ImportError Traceback (most recent call last) in () 1 from pyflink.table import DataTypes > 2 from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime 3 from pyflink.table.catalog import FileSystem ImportError: cannot import name 'Kafka' from 'pyflink.table.descriptors' (/usr/local/lib/python3.10/dist-packages/pyflink/table/descriptors.py) --- NOTE: If your import is failing due to a missing package, you can manually install dependencies using either !pip or !apt. To view examples of installing some common dependencies, click the "Open Examples" button below. --- I have doubt that if current error is related to a version and dependencies then I have to ask the developer if I do this python 3.8 env is that possible to get solved? Thanks for your time , -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32206) Error ModuleNotFoundError for Pyflink.table.descriptors and wheel version mismatch
Alireza Omidvar created FLINK-32206: --- Summary: Error ModuleNotFoundError for Pyflink.table.descriptors and wheel version mismatch Key: FLINK-32206 URL: https://issues.apache.org/jira/browse/FLINK-32206 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: mongodb-1.0.1 Reporter: Alireza Omidvar -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] RyanSkraba commented on a diff in pull request #22010: [FLINK-31192][connectors/dataGen] Fix dataGen takes too long to initi…
RyanSkraba commented on code in PR #22010: URL: https://github.com/apache/flink/pull/22010#discussion_r1206825892 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java: ## @@ -65,33 +71,46 @@ public void open( this.checkpointedState == null, "The " + getClass().getSimpleName() + " has already been initialized."); -this.checkpointedState = -context.getOperatorStateStore() -.getListState( -new ListStateDescriptor<>( -name + "-sequence-state", LongSerializer.INSTANCE)); -this.valuesToEmit = new ArrayDeque<>(); -if (context.isRestored()) { -// upon restoring +ListStateDescriptor stateDescriptor = +new ListStateDescriptor<>( +name + "-sequence-state", TypeInformation.of(InternalState.class)); +this.checkpointedState = context.getOperatorStateStore().getListState(stateDescriptor); +this.internalStates = Lists.newArrayList(); -for (Long v : this.checkpointedState.get()) { -this.valuesToEmit.add(v); -} +if (context.isRestored()) { +checkpointedState.get().forEach(state -> internalStates.add(state)); } else { -// the first time the job is executed -final int stepSize = runtimeContext.getNumberOfParallelSubtasks(); +// The first time the job is executed. final int taskIdx = runtimeContext.getIndexOfThisSubtask(); -final long congruence = start + taskIdx; - -long totalNoOfElements = Math.abs(end - start + 1); -final int baseSize = safeDivide(totalNoOfElements, stepSize); -final int toCollect = -(totalNoOfElements % stepSize > taskIdx) ? baseSize + 1 : baseSize; +final long stepSize = runtimeContext.getNumberOfParallelSubtasks(); +InternalState state = new InternalState(taskIdx, stepSize, start + taskIdx); +internalStates.add(state); +} +} -for (long collected = 0; collected < toCollect; collected++) { -this.valuesToEmit.add(collected * stepSize + congruence); +public Long nextValue() { +Iterator iterator = internalStates.iterator(); Review Comment: @xuzhiwen1255 This sounds perfect! > But I have an idea, that is, for each subtask, we let him generate more InternalState That is to say, instead of having N internal states when there are N tasks initially, we would have 2N, or 10N, or 100N. That number doesn't have to be extremely high to avoid skew here, and still benefit from the "concise" and efficient InternalState you've implemented here. I think there might be some cool way of splitting up the ranges differently, but I'd like to think about it! This should probably go on the future JIRA though :D @XComp Yes, this issue would happen to any source that created "one split per task" on initialization, where the split corresponds to the operator state -- but the previous implementation didn't do that. It was one state per element to emit, so it _could_ be perfectly rebalanced (but overall pretty inefficiently). I agree that it's not a problem for this PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] reswqa commented on a diff in pull request #22579: [FLINK-32059] Migrate subclasses of BatchAbstractTestBase in batch.sql.agg and batch.sql.join to JUnit5
reswqa commented on code in PR #22579: URL: https://github.com/apache/flink/pull/22579#discussion_r1206813779 ## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/SemiJoinITCase.scala: ## @@ -470,7 +469,7 @@ class SemiJoinITCase(expectedJoinType: JoinType) extends BatchTestBase { } object SemiJoinITCase { - @Parameterized.Parameters(name = "{0}-{1}") + @Parameters(name = "{0}-{1}") Review Comment: Then we should fix it. Please try to give it a more appropriate name. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] reswqa commented on a diff in pull request #22579: [FLINK-32059] Migrate subclasses of BatchAbstractTestBase in batch.sql.agg and batch.sql.join to JUnit5
reswqa commented on code in PR #22579: URL: https://github.com/apache/flink/pull/22579#discussion_r1206804563 ## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/JoinITCase.scala: ## @@ -1333,7 +1335,7 @@ class JoinITCase(expectedJoinType: JoinType) extends BatchTestBase { } object JoinITCase { - @Parameterized.Parameters(name = "{0}") + @Parameters(name = "{0}") Review Comment: I don't quite understand why modifying the strings(value of name) doesn't go through compilation? What you should modify is the value of name, not itself. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #22667: [FLINK-31806] Fix public API rule for connectors
flinkbot commented on PR #22667: URL: https://github.com/apache/flink/pull/22667#issuecomment-1564407238 ## CI report: * 21e34a00657502a6a502e436152d5cb01aad4872 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-31806) Prod architecture tests didn't detect non-public API usage
[ https://issues.apache.org/jira/browse/FLINK-31806?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-31806: --- Labels: pull-request-available (was: ) > Prod architecture tests didn't detect non-public API usage > -- > > Key: FLINK-31806 > URL: https://issues.apache.org/jira/browse/FLINK-31806 > Project: Flink > Issue Type: Technical Debt > Components: Connectors / Cassandra, Tests >Affects Versions: cassandra-3.0.0, 1.18.0 >Reporter: Chesnay Schepler >Assignee: Etienne Chauchot >Priority: Critical > Labels: pull-request-available > > FLINK-31805 wasn't detected by the production architecture tests. > Not sure if this is an issue on the cassandra or Flink side. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32203) Potential ClassLoader memory leak due to log4j configuration
[ https://issues.apache.org/jira/browse/FLINK-32203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726626#comment-17726626 ] Oleksandr Nitavskyi commented on FLINK-32203: - [~chesnay] thanks for looking into PR (https://github.com/apache/flink/pull/22664). You can see in attach an example of the stack trace, which we get when Log4jThread is being created. We have run the job and were killing one of JobManager to rely on HA and trigger the job restart. During debug of the Log4jThread creation we saw that in StackTrace there are Presto (for checkpoint) or Hadoop S3A (to write output on S3) FileSystems, which are loaded from Plugin Classloader. (example stack trace is attached) Do you know if a plugin Classloader instance is created per job, when a job is being created? If yes, probably this instance is being passed to Log4jContextFactory and thus a new Log4j subsystem being created. > Potential ClassLoader memory leak due to log4j configuration > > > Key: FLINK-32203 > URL: https://issues.apache.org/jira/browse/FLINK-32203 > Project: Flink > Issue Type: Bug >Reporter: Oleksandr Nitavskyi >Priority: Major > Labels: pull-request-available > Attachments: classloader_leak.png, > stack_trace_example_with_log4j_creation_on_job_reload.log > > > *Context* > We have encountered a memory leak related to ClassLoaders in Apache Flink. > ChildFirstClassLoader is not properly garbage collected, when job is being > restarted. > Heap Dump has shown that Log4j starts a configuration watch thread, which > then has Strong reference to ChildFirstClassLoader via AccessControlContext. > Since thread is never stopped, ChildFirstClassLoader is never cleaned. > Removal monitorInterval introduced in FLINK-20510 helps to mitigate the > issue, I believe it could be applied to log4j config by default. > *How to reproduce* > Deploy Flink job, which uses Hadoop File System (e.g. s3a). Redeploy the job > -> in Task Manager dump you should see multiple Log4jThreads > *AC* > We have a configuration which doesn't lead easy to memory leak with default > configuration for Flink users. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-connector-jdbc] matriv commented on pull request #29: [FLINK-31551] Add support for CrateDB
matriv commented on PR #29: URL: https://github.com/apache/flink-connector-jdbc/pull/29#issuecomment-1564380946 @MartijnVisser @snuyanzin I've rebased again, can you please run ci again? It would be great to merge it if everything passes, thx. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a diff in pull request #22010: [FLINK-31192][connectors/dataGen] Fix dataGen takes too long to initi…
XComp commented on code in PR #22010: URL: https://github.com/apache/flink/pull/22010#discussion_r1206745426 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java: ## @@ -65,33 +71,46 @@ public void open( this.checkpointedState == null, "The " + getClass().getSimpleName() + " has already been initialized."); -this.checkpointedState = -context.getOperatorStateStore() -.getListState( -new ListStateDescriptor<>( -name + "-sequence-state", LongSerializer.INSTANCE)); -this.valuesToEmit = new ArrayDeque<>(); -if (context.isRestored()) { -// upon restoring +ListStateDescriptor stateDescriptor = +new ListStateDescriptor<>( +name + "-sequence-state", TypeInformation.of(InternalState.class)); +this.checkpointedState = context.getOperatorStateStore().getListState(stateDescriptor); +this.internalStates = Lists.newArrayList(); -for (Long v : this.checkpointedState.get()) { -this.valuesToEmit.add(v); -} +if (context.isRestored()) { +checkpointedState.get().forEach(state -> internalStates.add(state)); } else { -// the first time the job is executed -final int stepSize = runtimeContext.getNumberOfParallelSubtasks(); +// The first time the job is executed. final int taskIdx = runtimeContext.getIndexOfThisSubtask(); -final long congruence = start + taskIdx; - -long totalNoOfElements = Math.abs(end - start + 1); -final int baseSize = safeDivide(totalNoOfElements, stepSize); -final int toCollect = -(totalNoOfElements % stepSize > taskIdx) ? baseSize + 1 : baseSize; +final long stepSize = runtimeContext.getNumberOfParallelSubtasks(); +InternalState state = new InternalState(taskIdx, stepSize, start + taskIdx); +internalStates.add(state); +} +} -for (long collected = 0; collected < toCollect; collected++) { -this.valuesToEmit.add(collected * stepSize + congruence); +public Long nextValue() { +Iterator iterator = internalStates.iterator(); Review Comment: Thanks for sharing your view, @RyanSkraba. I think you're right - the implementation in master (i.e. without this PR's change) doesn't support reducing the parallelism properly, too. ``` Avoid performance skew after reducing parallelism. Currently, going from N tasks to N-1 tasks will double the length of time it takes, since one task will have twice as many values to emit as the others. This is a undesirable consequence of this PR, but can probably also be discussed and implemented in a subsequent step. ``` I don't understand that part: Isn't the performance skew issue you're describing more general and not rely connected to the `DataGenerator`? The skew you're mentioning is happening in any case when reducing the parallelism. It's more of a question how Flink would distribute the operator state after the parallelism was reduced. :thinking: Therefore, I don't see this as a problem for this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] WencongLiu commented on a diff in pull request #22579: [FLINK-32059] Migrate subclasses of BatchAbstractTestBase in batch.sql.agg and batch.sql.join to JUnit5
WencongLiu commented on code in PR #22579: URL: https://github.com/apache/flink/pull/22579#discussion_r1206741341 ## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/OuterJoinITCase.scala: ## @@ -398,7 +398,7 @@ class OuterJoinITCase(expectedJoinType: JoinType) extends BatchTestBase { } object OuterJoinITCase { - @Parameterized.Parameters(name = "{0}") + @Parameters(name = "{0}") Review Comment: Same reason with before. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] WencongLiu commented on a diff in pull request #22579: [FLINK-32059] Migrate subclasses of BatchAbstractTestBase in batch.sql.agg and batch.sql.join to JUnit5
WencongLiu commented on code in PR #22579: URL: https://github.com/apache/flink/pull/22579#discussion_r1206740602 ## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/JoinITCase.scala: ## @@ -1333,7 +1335,7 @@ class JoinITCase(expectedJoinType: JoinType) extends BatchTestBase { } object JoinITCase { - @Parameterized.Parameters(name = "{0}") + @Parameters(name = "{0}") Review Comment: If the "name" is modified to "expectedJoinType". It can't be compiled successfully. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] WencongLiu commented on a diff in pull request #22579: [FLINK-32059] Migrate subclasses of BatchAbstractTestBase in batch.sql.agg and batch.sql.join to JUnit5
WencongLiu commented on code in PR #22579: URL: https://github.com/apache/flink/pull/22579#discussion_r1206740122 ## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/LookupJoinITCase.scala: ## @@ -26,23 +26,31 @@ import org.apache.flink.table.planner.factories.TestValuesTableFactory import org.apache.flink.table.planner.runtime.utils.{BatchTestBase, InMemoryLookupableTableSource} import org.apache.flink.table.runtime.functions.table.fullcache.inputformat.FullCacheTestInputFormat import org.apache.flink.table.runtime.functions.table.lookup.LookupCacheManager +import org.apache.flink.testutils.junit.extensions.parameterized.{Parameter, ParameterizedTestExtension, Parameters} import org.apache.flink.types.Row import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.api.Assumptions.assumeThat import org.assertj.core.api.IterableAssert.assertThatIterable -import org.junit.{After, Assume, Before, Test} -import org.junit.Assert.assertEquals -import org.junit.runner.RunWith -import org.junit.runners.Parameterized +import org.junit.jupiter.api.{AfterEach, BeforeEach, TestTemplate} +import org.junit.jupiter.api.Assertions.assertEquals Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] WencongLiu commented on a diff in pull request #22579: [FLINK-32059] Migrate subclasses of BatchAbstractTestBase in batch.sql.agg and batch.sql.join to JUnit5
WencongLiu commented on code in PR #22579: URL: https://github.com/apache/flink/pull/22579#discussion_r1206739358 ## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/SemiJoinITCase.scala: ## @@ -470,7 +469,7 @@ class SemiJoinITCase(expectedJoinType: JoinType) extends BatchTestBase { } object SemiJoinITCase { - @Parameterized.Parameters(name = "{0}-{1}") + @Parameters(name = "{0}-{1}") Review Comment: Yes. There is one parameter. ## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/JoinITCase.scala: ## @@ -1333,7 +1335,7 @@ class JoinITCase(expectedJoinType: JoinType) extends BatchTestBase { } object JoinITCase { - @Parameterized.Parameters(name = "{0}") + @Parameters(name = "{0}") Review Comment: If the "name" is modified to "expectedJoinType". It can't be compiled successfully. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-32204) ZooKeeperLeaderElectionTest.testZooKeeperReelectionWithReplacement fails with The ExecutorService is shut down already. No Callables can be executed on AZP
[ https://issues.apache.org/jira/browse/FLINK-32204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726616#comment-17726616 ] Matthias Pohl commented on FLINK-32204: --- FYI: I couldn't reproduce the error locally with 1 test executions. > ZooKeeperLeaderElectionTest.testZooKeeperReelectionWithReplacement fails with > The ExecutorService is shut down already. No Callables can be executed on AZP > --- > > Key: FLINK-32204 > URL: https://issues.apache.org/jira/browse/FLINK-32204 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.18.0 >Reporter: Sergey Nuyanzin >Assignee: Matthias Pohl >Priority: Critical > Labels: test-stability > > [This > build|https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49386=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=7095] > fails as > {noformat} > May 25 18:45:50 Caused by: java.util.concurrent.RejectedExecutionException: > The ExecutorService is shut down already. No Callables can be executed. > May 25 18:45:50 at > org.apache.flink.util.concurrent.DirectExecutorService.throwRejectedExecutionExceptionIfShutdown(DirectExecutorService.java:237) > May 25 18:45:50 at > org.apache.flink.util.concurrent.DirectExecutorService.submit(DirectExecutorService.java:100) > May 25 18:45:50 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.TreeCache.publishEvent(TreeCache.java:902) > May 25 18:45:50 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.TreeCache.publishEvent(TreeCache.java:894) > May 25 18:45:50 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.TreeCache.access$1200(TreeCache.java:79) > May 25 18:45:50 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.TreeCache$TreeNode.processResult(TreeCache.java:489) > May 25 18:45:50 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.CuratorFrameworkImpl.sendToBackgroundCallback(CuratorFrameworkImpl.java:926) > May 25 18:45:50 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.CuratorFrameworkImpl.processBackgroundOperation(CuratorFrameworkImpl.java:683) > May 25 18:45:50 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.WatcherRemovalFacade.processBackgroundOperation(WatcherRemovalFacade.java:152) > May 25 18:45:50 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.GetDataBuilderImpl$3.processResult(GetDataBuilderImpl.java:272) > May 25 18:45:50 at > org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:634) > May 25 18:45:50 at > org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:553) > May 25 18:45:50 > {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32204) ZooKeeperLeaderElectionTest.testZooKeeperReelectionWithReplacement fails with The ExecutorService is shut down already. No Callables can be executed on AZP
[ https://issues.apache.org/jira/browse/FLINK-32204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726613#comment-17726613 ] Matthias Pohl commented on FLINK-32204: --- The curator's {{TreeCache}} isn't thread-safe: The event processing within the cache happens is triggered from within the client's EventThread and calls {{TreeCache#publishEvent}}: {code} [...] May 25 18:45:50 at org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.TreeCache.publishEvent(TreeCache.java:902) May 25 18:45:50 at org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.TreeCache.publishEvent(TreeCache.java:894) [...] May 25 18:45:50 at org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:634) May 25 18:45:50 at org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:553) [...] {code} {{TreeCache#publishEvent}} will submit a new task to the cache's {{executorService}} (see [TreeCache:901|https://github.com/apache/curator/blob/844c0ad36340b695b2784489c078cfd78522143c/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java#L901]) which is a {{directExecutorService}} in the case of Flink's ZooKeeper LeaderElectionDriver implementations (see [ZooKeeperUtils:764|https://github.com/apache/flink/blob/4576e4384ff36623712043564039f654c3b44a30/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java#L764] for the legacy {{ZooKeeperLeaderElectionDriver}} and [ZooKeeperMultipleComponentLeaderElectionDriver:76|https://github.com/apache/flink/blob/8ddfd590ebba7fc727e79db41b82d3d40a02b56a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperMultipleComponentLeaderElectionDriver.java#L76]). The close call happens in the test's main thread. The {{TreeCache#close}} call sets the cache's state to {{CLOSED}} in [TreeCache:628|https://github.com/apache/curator/blob/844c0ad36340b695b2784489c078cfd78522143c/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java#L628]. {{TreeCache#publishEvent}} checks this state in [TreeCache:898|https://github.com/apache/curator/blob/844c0ad36340b695b2784489c078cfd78522143c/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java#L898]. The latter one doesn't use a lock. This can cause a race condition where the {{publishEvent}} method is called and passes the if condition before the test's main thread can trigger the close method but after the task is actually submitted causing the {{RejectedExecutionException}} which we're observing right now. [~dmvk] may you verify my finding? I would suggest adding a less restrictive version of the {{DirectExecutorService}} that we could use in the production code to avoid running into this bug. We could continue to use the more restrictive version (which was introduced in FLINK-31995) in the test code. WDYT David? > ZooKeeperLeaderElectionTest.testZooKeeperReelectionWithReplacement fails with > The ExecutorService is shut down already. No Callables can be executed on AZP > --- > > Key: FLINK-32204 > URL: https://issues.apache.org/jira/browse/FLINK-32204 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.18.0 >Reporter: Sergey Nuyanzin >Assignee: Matthias Pohl >Priority: Critical > Labels: test-stability > > [This > build|https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49386=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=7095] > fails as > {noformat} > May 25 18:45:50 Caused by: java.util.concurrent.RejectedExecutionException: > The ExecutorService is shut down already. No Callables can be executed. > May 25 18:45:50 at > org.apache.flink.util.concurrent.DirectExecutorService.throwRejectedExecutionExceptionIfShutdown(DirectExecutorService.java:237) > May 25 18:45:50 at > org.apache.flink.util.concurrent.DirectExecutorService.submit(DirectExecutorService.java:100) > May 25 18:45:50 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.TreeCache.publishEvent(TreeCache.java:902) > May 25 18:45:50 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.TreeCache.publishEvent(TreeCache.java:894) > May 25 18:45:50 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.TreeCache.access$1200(TreeCache.java:79) > May 25 18:45:50 at >
[jira] [Comment Edited] (FLINK-32188) Does the custom connector not support pushing down "where" query predicates to query fields of array type?
[ https://issues.apache.org/jira/browse/FLINK-32188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726609#comment-17726609 ] Xin Chen edited comment on FLINK-32188 at 5/26/23 12:32 PM: [~jark] Thank you for your reply!So should I submit a PR about this in github?Or the better way to fix this? When I fix the issue through the method I introduced above,and url is defined as an array,I can obtain predicates successfully. But I found another issue in my debugging process at the same time,when url is defined as string not an array,sql like this returned no results: {code:java} String s1_and = "select * from image_source " + "where url = 'aaa.jpg'" + "and url = 'bbb.jpg'"; {code} This is because ‘url‘ cannot be assigned two different values simultaneously. I think this is logical. But when I defined it as an array type, sql like this returned two records unexpectedly! {code:java} String s3_and = "select * from image_source where " + "url = ARRAY['aaa.jpg', 'bbb.jpg'] and url = ARRAY['ccc.jpg', 'ddd.jpg']"; {code} I debugged and found that the task has entered the *PushFilterIntoTableSourceScanRule *of org. apache. Flink. table. planner. plan. rules. logical, but when url is a string,the task of “url = xxx and url = yyy” didn't enter 'onMatch' method of *PushFilterIntoTableSourceScanRule*. !screenshot-8.png! {code:java} // filter: rel#100:LogicalFilter.NONE.any.None: 0.[NONE].[NONE](input=HepRelVertex#99,condition= AND(=($0, CAST(ARRAY(_UTF-16LE'aaa.jpg', _UTF-16LE'bbb.jpg')):VARCHAR(2147483647) CHARACTER SET "UTF-16LE" ARRAY NOT NULL), =($0, CAST(ARRAY(_UTF-16LE'ccc.jpg', _UTF-16LE'ddd.jpg')):VARCHAR(2147483647) CHARACTER SET "UTF-16LE" ARRAY NOT NULL))) {code} I am confused how the frame work to verify that a field cannot be equal to two values. In which module is this processed?Perhaps it should be in the code before ‘PushFilterIntoTableSourceScanRule’, but where is it? I think this verification logic may need to be fixed for array types. was (Author: JIRAUSER298666): [~jark] Thank you for your reply!So should I submit a PR about this in github?Or the better way to fix this? When I fix the issue through the method I introduced above,and url is defined as an array,I can obtain predicates successfully. But I found another issue in my debugging process at the same time,when url is defined as string not an array,sql like this returned no results: {code:java} String s1_and = "select * from image_source " + "where url = 'aaa.jpg'" + "and url = 'bbb.jpg'"; {code} This is because ‘url‘ cannot be assigned two different values simultaneously. I think this is logical. But when I defined it as an array type, sql like this returned two records unexpectedly! {code:java} String s3_and = "select * from image_source where " + "url = ARRAY['aaa.jpg', 'bbb.jpg'] and url = ARRAY['ccc.jpg', 'ddd.jpg']"; {code} I debugged and found that the task has entered the *PushFilterIntoTableSourceScanRule *of org. apache. Flink. table. planner. plan. rules. logical, but when url is a string,the task of “url = xxx and url = yyy” didn't enter 'onMatch' method of *PushFilterIntoTableSourceScanRule*. !screenshot-8.png! {code:java} // filter: rel#100:LogicalFilter.NONE.any.None: 0.[NONE].[NONE](input=HepRelVertex#99,condition= AND(=($0, CAST(ARRAY(_UTF-16LE'aaa.jpg', _UTF-16LE'bbb.jpg')):VARCHAR(2147483647) CHARACTER SET "UTF-16LE" ARRAY NOT NULL), =($0, CAST(ARRAY(_UTF-16LE'ccc.jpg', _UTF-16LE'ddd.jpg')):VARCHAR(2147483647) CHARACTER SET "UTF-16LE" ARRAY NOT NULL))) {code} I am confused how the frame work to verify that a field cannot be equal to two values. In which module is this processed?Perhaps it should be in the code before ‘PushFilterIntoTableSourceScanRule’, but where is it? I think it need to be fixed for array types. > Does the custom connector not support pushing down "where" query predicates > to query fields of array type? > -- > > Key: FLINK-32188 > URL: https://issues.apache.org/jira/browse/FLINK-32188 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.2, 1.17.0, 1.16.1 >Reporter: Xin Chen >Priority: Major > Attachments: image-2023-05-25-17-16-02-288.png, > image-2023-05-25-20-44-08-834.png, image-2023-05-25-20-44-47-581.png, > screenshot-1.png, screenshot-2.png, screenshot-3.png, screenshot-4.png, > screenshot-5.png, screenshot-6.png, screenshot-7.png, screenshot-8.png > > > When I customized a data source connector which assumed as image-connector, I > found that when creating a table with ddl, I specified a field URL as an > array type.
[jira] [Comment Edited] (FLINK-32188) Does the custom connector not support pushing down "where" query predicates to query fields of array type?
[ https://issues.apache.org/jira/browse/FLINK-32188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726609#comment-17726609 ] Xin Chen edited comment on FLINK-32188 at 5/26/23 12:30 PM: [~jark] Thank you for your reply!So should I submit a PR about this in github?Or the better way to fix this? When I fix the issue through the method I introduced above,and url is defined as an array,I can obtain predicates successfully. But I found another issue in my debugging process at the same time,when url is defined as string not an array,sql like this returned no results: {code:java} String s1_and = "select * from image_source " + "where url = 'aaa.jpg'" + "and url = 'bbb.jpg'"; {code} This is because ‘url‘ cannot be assigned two different values simultaneously. I think this is logical. But when I defined it as an array type, sql like this returned two records unexpectedly! {code:java} String s3_and = "select * from image_source where " + "url = ARRAY['aaa.jpg', 'bbb.jpg'] and url = ARRAY['ccc.jpg', 'ddd.jpg']"; {code} I debugged and found that the task has entered the *PushFilterIntoTableSourceScanRule *of org. apache. Flink. table. planner. plan. rules. logical, but when url is a string,the task of “url = xxx and url = yyy” didn't enter 'onMatch' method of *PushFilterIntoTableSourceScanRule*. !screenshot-8.png! {code:java} // filter: rel#100:LogicalFilter.NONE.any.None: 0.[NONE].[NONE](input=HepRelVertex#99,condition= AND(=($0, CAST(ARRAY(_UTF-16LE'aaa.jpg', _UTF-16LE'bbb.jpg')):VARCHAR(2147483647) CHARACTER SET "UTF-16LE" ARRAY NOT NULL), =($0, CAST(ARRAY(_UTF-16LE'ccc.jpg', _UTF-16LE'ddd.jpg')):VARCHAR(2147483647) CHARACTER SET "UTF-16LE" ARRAY NOT NULL))) {code} I am confused how the frame work to verify that a field cannot be equal to two values. In which module is this processed?Perhaps it should be in the code before ‘PushFilterIntoTableSourceScanRule’, but where is it? I think it need to be fixed for array types. was (Author: JIRAUSER298666): [~jark] Thank you for your reply!So should I submit a PR about this in github?Or the better way to fix this? When I fix the issue through the method I introduced above,and url is defined as an array,I can obtain predicates successfully. But I found another issue in my debugging process at the same time,when url is defined as string not an array,sql like this returned no results: {code:java} String s1_and = "select * from image_source " + "where url = 'aaa.jpg'" + "and url = 'bbb.jpg'"; {code} This is because ‘url‘ cannot be assigned two different values simultaneously. I think this is logical. But when I defined it as an array type, sql like this returned two records unexpectedly! {code:java} String s3_and = "select * from image_source where " + "url = ARRAY['aaa.jpg', 'bbb.jpg'] and url = ARRAY['ccc.jpg', 'ddd.jpg']"; {code} I debugged and found that the task has entered the *PushFilterIntoTableSourceScanRule *of org. apache. Flink. table. planner. plan. rules. logical, but when url is a string,the task of “url = xxx and url = yyy” didn't enter 'onMatch' method of *PushFilterIntoTableSourceScanRule*. !screenshot-8.png! {code:java} // filter: rel#100:LogicalFilter.NONE.any.None: 0.[NONE].[NONE](input=HepRelVertex#99,condition= AND(=($0, CAST(ARRAY(_UTF-16LE'aaa.jpg', _UTF-16LE'bbb.jpg')):VARCHAR(2147483647) CHARACTER SET "UTF-16LE" ARRAY NOT NULL), =($0, CAST(ARRAY(_UTF-16LE'ccc.jpg', _UTF-16LE'ddd.jpg')):VARCHAR(2147483647) CHARACTER SET "UTF-16LE" ARRAY NOT NULL))) {code} I am confused how the frame work to verify that a field cannot be equal to two values. In which module is this processed?I think it need to be fixed. > Does the custom connector not support pushing down "where" query predicates > to query fields of array type? > -- > > Key: FLINK-32188 > URL: https://issues.apache.org/jira/browse/FLINK-32188 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.2, 1.17.0, 1.16.1 >Reporter: Xin Chen >Priority: Major > Attachments: image-2023-05-25-17-16-02-288.png, > image-2023-05-25-20-44-08-834.png, image-2023-05-25-20-44-47-581.png, > screenshot-1.png, screenshot-2.png, screenshot-3.png, screenshot-4.png, > screenshot-5.png, screenshot-6.png, screenshot-7.png, screenshot-8.png > > > When I customized a data source connector which assumed as image-connector, I > found that when creating a table with ddl, I specified a field URL as an > array type. When submitting an SQL task with Flink, I specified query this > field as a fixed array. For example, select * from image source where
[jira] [Comment Edited] (FLINK-32188) Does the custom connector not support pushing down "where" query predicates to query fields of array type?
[ https://issues.apache.org/jira/browse/FLINK-32188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726609#comment-17726609 ] Xin Chen edited comment on FLINK-32188 at 5/26/23 12:26 PM: [~jark] Thank you for your reply!So should I submit a PR about this in github?Or the better way to fix this? When I fix the issue through the method I introduced above,and url is defined as an array,I can obtain predicates successfully. But I found another issue in my debugging process at the same time,when url is defined as string not an array,sql like this returned no results: {code:java} String s1_and = "select * from image_source " + "where url = 'aaa.jpg'" + "and url = 'bbb.jpg'"; {code} This is because ‘url‘ cannot be assigned two different values simultaneously. I think this is logical. But when I defined it as an array type, sql like this returned two records unexpectedly! {code:java} String s3_and = "select * from image_source where " + "url = ARRAY['aaa.jpg', 'bbb.jpg'] and url = ARRAY['ccc.jpg', 'ddd.jpg']"; {code} I debugged and found that the task has entered the *PushFilterIntoTableSourceScanRule *of org. apache. Flink. table. planner. plan. rules. logical, but when url is a string,the task of “url = xxx and url = yyy” didn't enter 'onMatch' method of *PushFilterIntoTableSourceScanRule*. !screenshot-8.png! {code:java} // filter: rel#100:LogicalFilter.NONE.any.None: 0.[NONE].[NONE](input=HepRelVertex#99,condition= AND(=($0, CAST(ARRAY(_UTF-16LE'aaa.jpg', _UTF-16LE'bbb.jpg')):VARCHAR(2147483647) CHARACTER SET "UTF-16LE" ARRAY NOT NULL), =($0, CAST(ARRAY(_UTF-16LE'ccc.jpg', _UTF-16LE'ddd.jpg')):VARCHAR(2147483647) CHARACTER SET "UTF-16LE" ARRAY NOT NULL))) {code} I am confused how the frame work to verify that a field cannot be equal to two values. In which module is this processed?I think it need to be fixed. was (Author: JIRAUSER298666): [~jark] Thank you for your reply!So should I submit a PR about this in github?Or the better way to fix this? When I fix the issue through the method I introduced above,and url is defined as an array,I can obtain predicates successfully. But I found another issue in my debugging process at the same time,when url is defined as string not an array,sql like this returned no results: {code:java} String s1_and = "select * from image_source " + "where url = 'aaa.jpg'" + "and url = 'bbb.jpg'"; {code} This is because ‘url‘ cannot be assigned two different values simultaneously. I think this is logical. But when I defined it as an array type, sql like this returned two records unexpectedly! {code:java} String s3_and = "select * from image_source where " + "url = ARRAY['aaa.jpg', 'bbb.jpg'] and url = ARRAY['ccc.jpg', 'ddd.jpg']"; {code} I debugged and found that the task has entered the *PushFilterIntoTableSourceScanRule *of org. apache. Flink. table. planner. plan. rules. logical, but when url is a string,the task of “url = xxx and url = yyy” didn't enter 'onMatch' method of *PushFilterIntoTableSourceScanRule*. !screenshot-8.png! {code:java} // filter: rel#100:LogicalFilter.NONE.any.None: 0.[NONE].[NONE](input=HepRelVertex#99,condition= AND(=($0, CAST(ARRAY(_UTF-16LE'aaa.jpg', _UTF-16LE'bbb.jpg')):VARCHAR(2147483647) CHARACTER SET "UTF-16LE" ARRAY NOT NULL), =($0, CAST(ARRAY(_UTF-16LE'ccc.jpg', _UTF-16LE'ddd.jpg')):VARCHAR(2147483647) CHARACTER SET "UTF-16LE" ARRAY NOT NULL))) {code} I am confused how the frame work to verify that a field cannot be equal to two values at the same time. In which module is this processed?I think it need to be fixed. > Does the custom connector not support pushing down "where" query predicates > to query fields of array type? > -- > > Key: FLINK-32188 > URL: https://issues.apache.org/jira/browse/FLINK-32188 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.2, 1.17.0, 1.16.1 >Reporter: Xin Chen >Priority: Major > Attachments: image-2023-05-25-17-16-02-288.png, > image-2023-05-25-20-44-08-834.png, image-2023-05-25-20-44-47-581.png, > screenshot-1.png, screenshot-2.png, screenshot-3.png, screenshot-4.png, > screenshot-5.png, screenshot-6.png, screenshot-7.png, screenshot-8.png > > > When I customized a data source connector which assumed as image-connector, I > found that when creating a table with ddl, I specified a field URL as an > array type. When submitting an SQL task with Flink, I specified query this > field as a fixed array. For example, select * from image source where > URL=ARRAY ['/flink. jpg', '/flink_1. jpg'], but it couldn't obtain the > corresponding
[jira] [Comment Edited] (FLINK-32188) Does the custom connector not support pushing down "where" query predicates to query fields of array type?
[ https://issues.apache.org/jira/browse/FLINK-32188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726609#comment-17726609 ] Xin Chen edited comment on FLINK-32188 at 5/26/23 12:24 PM: [~jark] Thank you for your reply!So should I submit a PR about this in github?Or the better way to fix this? When I fix the issue through the method I introduced above,and url is defined as an array,I can obtain predicates successfully. But I found another issue in my debugging process at the same time,when url is defined as string not an array,sql like this returned no results: {code:java} String s1_and = "select * from image_source " + "where url = 'aaa.jpg'" + "and url = 'bbb.jpg'"; {code} This is because ‘url‘ cannot be assigned two different values simultaneously. I think this is logical. But when I defined it as an array type, sql like this returned two records unexpectedly! {code:java} String s3_and = "select * from image_source where " + "url = ARRAY['aaa.jpg', 'bbb.jpg'] and url = ARRAY['ccc.jpg', 'ddd.jpg']"; {code} I debugged and found that the task has entered the *PushFilterIntoTableSourceScanRule *of org. apache. Flink. table. planner. plan. rules. logical, but when url is a string,the task of “url = xxx and url = yyy” didn't enter 'onMatch' method of *PushFilterIntoTableSourceScanRule*. !screenshot-8.png! {code:java} // filter: rel#100:LogicalFilter.NONE.any.None: 0.[NONE].[NONE](input=HepRelVertex#99,condition= AND(=($0, CAST(ARRAY(_UTF-16LE'aaa.jpg', _UTF-16LE'bbb.jpg')):VARCHAR(2147483647) CHARACTER SET "UTF-16LE" ARRAY NOT NULL), =($0, CAST(ARRAY(_UTF-16LE'ccc.jpg', _UTF-16LE'ddd.jpg')):VARCHAR(2147483647) CHARACTER SET "UTF-16LE" ARRAY NOT NULL))) {code} I am confused how the frame work to verify that a field cannot be equal to two values at the same time. In which module is this processed?I think it need to be fixed. was (Author: JIRAUSER298666): [~jark] Thank you for your reply!So should I submit a PR about this in github?Or the better way to fix this? When I fix the issue through the method I introduced above,and url is defined as an array,I can obtain predicates successfully. But I found another issue in my debugging process at the same time,when url is defined as string not an array,sql like this returned no results: {code:java} String s1_and = "select * from image_source " + "where url = 'aaa.jpg'" + "and url = 'bbb.jpg'"; {code} This is because ‘url‘ cannot be assigned two different values simultaneously. I think this is logical. But when I defined it as an array type, sql like this returned two records unexpectedly! {code:java} String s3_and = "select image, errorCode from image_source where " + "url = ARRAY['aaa.jpg', 'bbb.jpg'] and url = ARRAY['ccc.jpg', 'ddd.jpg']"; {code} I debugged and found that the task has entered the *PushFilterIntoTableSourceScanRule *of org. apache. Flink. table. planner. plan. rules. logical, but when url is a string,the task of “url = xxx and url = yyy” didn't enter 'onMatch' method of *PushFilterIntoTableSourceScanRule*. !screenshot-8.png! {code:java} // filter: rel#100:LogicalFilter.NONE.any.None: 0.[NONE].[NONE](input=HepRelVertex#99,condition= AND(=($0, CAST(ARRAY(_UTF-16LE'aaa.jpg', _UTF-16LE'bbb.jpg')):VARCHAR(2147483647) CHARACTER SET "UTF-16LE" ARRAY NOT NULL), =($0, CAST(ARRAY(_UTF-16LE'ccc.jpg', _UTF-16LE'ddd.jpg')):VARCHAR(2147483647) CHARACTER SET "UTF-16LE" ARRAY NOT NULL))) {code} I am confused how the frame work to verify that a field cannot be equal to two values at the same time. In which module is this processed?I think it need to be fixed. > Does the custom connector not support pushing down "where" query predicates > to query fields of array type? > -- > > Key: FLINK-32188 > URL: https://issues.apache.org/jira/browse/FLINK-32188 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.2, 1.17.0, 1.16.1 >Reporter: Xin Chen >Priority: Major > Attachments: image-2023-05-25-17-16-02-288.png, > image-2023-05-25-20-44-08-834.png, image-2023-05-25-20-44-47-581.png, > screenshot-1.png, screenshot-2.png, screenshot-3.png, screenshot-4.png, > screenshot-5.png, screenshot-6.png, screenshot-7.png, screenshot-8.png > > > When I customized a data source connector which assumed as image-connector, I > found that when creating a table with ddl, I specified a field URL as an > array type. When submitting an SQL task with Flink, I specified query this > field as a fixed array. For example, select * from image source where > URL=ARRAY ['/flink. jpg', '/flink_1. jpg'], but it
[jira] [Comment Edited] (FLINK-32188) Does the custom connector not support pushing down "where" query predicates to query fields of array type?
[ https://issues.apache.org/jira/browse/FLINK-32188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726609#comment-17726609 ] Xin Chen edited comment on FLINK-32188 at 5/26/23 12:23 PM: [~jark] Thank you for your reply!So should I submit a PR about this in github?Or the better way to fix this? When I fix the issue through the method I introduced above,and url is defined as an array,I can obtain predicates successfully. But I found another issue in my debugging process at the same time,when url is defined as string not an array,sql like this returned no results: {code:java} String s1_and = "select * from image_source " + "where url = 'aaa.jpg'" + "and url = 'bbb.jpg'"; {code} This is because ‘url‘ cannot be assigned two different values simultaneously. I think this is logical. But when I defined it as an array type, sql like this returned two records unexpectedly! {code:java} String s3_and = "select image, errorCode from image_source where " + "url = ARRAY['aaa.jpg', 'bbb.jpg'] and url = ARRAY['ccc.jpg', 'ddd.jpg']"; {code} I debugged and found that the task has entered the *PushFilterIntoTableSourceScanRule *of org. apache. Flink. table. planner. plan. rules. logical, but when url is a string,the task of “url = xxx and url = yyy” didn't enter 'onMatch' method of *PushFilterIntoTableSourceScanRule*. !screenshot-8.png! {code:java} // filter: rel#100:LogicalFilter.NONE.any.None: 0.[NONE].[NONE](input=HepRelVertex#99,condition= AND(=($0, CAST(ARRAY(_UTF-16LE'aaa.jpg', _UTF-16LE'bbb.jpg')):VARCHAR(2147483647) CHARACTER SET "UTF-16LE" ARRAY NOT NULL), =($0, CAST(ARRAY(_UTF-16LE'ccc.jpg', _UTF-16LE'ddd.jpg')):VARCHAR(2147483647) CHARACTER SET "UTF-16LE" ARRAY NOT NULL))) {code} I am confused how the frame work to verify that a field cannot be equal to two values at the same time. In which module is this processed?I think it need to be fixed. was (Author: JIRAUSER298666): [~jark]Jark Wu Thank you for your reply!So should I submit a PR about this in github?Or the better way to fix this? When I fix the issue through the method I introduced above,and url is defined as an array,I can obtain predicates successfully. But I found another issue in my debugging process at the same time,when url is defined as string not an array,sql like this returned no results: {code:java} String s1_and = "select * from image_source " + "where url = 'aaa.jpg'" + "and url = 'bbb.jpg'"; {code} This is because ‘url‘ cannot be assigned two different values simultaneously. I think this is logical. But when I defined it as an array type, sql like this returned two records unexpectedly! {code:java} String s3_and = "select image, errorCode from image_source where " + "url = ARRAY['aaa.jpg', 'bbb.jpg'] and url = ARRAY['ccc.jpg', 'ddd.jpg']"; {code} I debugged and found that the task has entered the *PushFilterIntoTableSourceScanRule *of org. apache. Flink. table. planner. plan. rules. logical, but when url is a string,the task of “url = xxx and url = yyy” didn't enter 'onMatch' method of *PushFilterIntoTableSourceScanRule*. !screenshot-8.png! {code:java} // filter: rel#100:LogicalFilter.NONE.any.None: 0.[NONE].[NONE](input=HepRelVertex#99,condition= AND(=($0, CAST(ARRAY(_UTF-16LE'aaa.jpg', _UTF-16LE'bbb.jpg')):VARCHAR(2147483647) CHARACTER SET "UTF-16LE" ARRAY NOT NULL), =($0, CAST(ARRAY(_UTF-16LE'ccc.jpg', _UTF-16LE'ddd.jpg')):VARCHAR(2147483647) CHARACTER SET "UTF-16LE" ARRAY NOT NULL))) {code} I am confused how the frame work to verify that a field cannot be equal to two values at the same time. In which module is this processed?I think it need to be fixed. > Does the custom connector not support pushing down "where" query predicates > to query fields of array type? > -- > > Key: FLINK-32188 > URL: https://issues.apache.org/jira/browse/FLINK-32188 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.2, 1.17.0, 1.16.1 >Reporter: Xin Chen >Priority: Major > Attachments: image-2023-05-25-17-16-02-288.png, > image-2023-05-25-20-44-08-834.png, image-2023-05-25-20-44-47-581.png, > screenshot-1.png, screenshot-2.png, screenshot-3.png, screenshot-4.png, > screenshot-5.png, screenshot-6.png, screenshot-7.png, screenshot-8.png > > > When I customized a data source connector which assumed as image-connector, I > found that when creating a table with ddl, I specified a field URL as an > array type. When submitting an SQL task with Flink, I specified query this > field as a fixed array. For example, select * from image source where > URL=ARRAY ['/flink. jpg', '/flink_1.
[jira] [Commented] (FLINK-32188) Does the custom connector not support pushing down "where" query predicates to query fields of array type?
[ https://issues.apache.org/jira/browse/FLINK-32188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726609#comment-17726609 ] Xin Chen commented on FLINK-32188: -- Jark Wu Thank you for your reply!So should I submit a PR about this in github?Or the better way to fix this? When I fix the issue through the method I introduced above,and url is defined as an array,I can obtain predicates successfully. But I found another issue in my debugging process at the same time,when url is defined as string not an array,sql like this returned no results: {code:java} String s1_and = "select * from image_source " + "where url = 'aaa.jpg'" + "and url = 'bbb.jpg'"; {code} This is because ‘url‘ cannot be assigned two different values simultaneously. I think this is logical. But when I defined it as an array type, sql like this returned two records unexpectedly! {code:java} String s3_and = "select image, errorCode from image_source where " + "url = ARRAY['aaa.jpg', 'bbb.jpg'] and url = ARRAY['ccc.jpg', 'ddd.jpg']"; {code} I debugged and found that the task has entered the *PushFilterIntoTableSourceScanRule *of org. apache. Flink. table. planner. plan. rules. logical, but when url is a string,the task of “url = xxx and url = yyy” didn't enter 'onMatch' method of *PushFilterIntoTableSourceScanRule*. !screenshot-8.png! {code:java} // filter: rel#100:LogicalFilter.NONE.any.None: 0.[NONE].[NONE](input=HepRelVertex#99,condition= AND(=($0, CAST(ARRAY(_UTF-16LE'aaa.jpg', _UTF-16LE'bbb.jpg')):VARCHAR(2147483647) CHARACTER SET "UTF-16LE" ARRAY NOT NULL), =($0, CAST(ARRAY(_UTF-16LE'ccc.jpg', _UTF-16LE'ddd.jpg')):VARCHAR(2147483647) CHARACTER SET "UTF-16LE" ARRAY NOT NULL))) {code} I am confused how the frame work to verify that a field cannot be equal to two values at the same time. In which module is this processed?I think it need to be fixed. > Does the custom connector not support pushing down "where" query predicates > to query fields of array type? > -- > > Key: FLINK-32188 > URL: https://issues.apache.org/jira/browse/FLINK-32188 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.2, 1.17.0, 1.16.1 >Reporter: Xin Chen >Priority: Major > Attachments: image-2023-05-25-17-16-02-288.png, > image-2023-05-25-20-44-08-834.png, image-2023-05-25-20-44-47-581.png, > screenshot-1.png, screenshot-2.png, screenshot-3.png, screenshot-4.png, > screenshot-5.png, screenshot-6.png, screenshot-7.png, screenshot-8.png > > > When I customized a data source connector which assumed as image-connector, I > found that when creating a table with ddl, I specified a field URL as an > array type. When submitting an SQL task with Flink, I specified query this > field as a fixed array. For example, select * from image source where > URL=ARRAY ['/flink. jpg', '/flink_1. jpg'], but it couldn't obtain the > corresponding predicate filters at all. > Does the custom connector not support to query fields of array type with > "where"? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-32188) Does the custom connector not support pushing down "where" query predicates to query fields of array type?
[ https://issues.apache.org/jira/browse/FLINK-32188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726609#comment-17726609 ] Xin Chen edited comment on FLINK-32188 at 5/26/23 12:22 PM: [~jark]Jark Wu Thank you for your reply!So should I submit a PR about this in github?Or the better way to fix this? When I fix the issue through the method I introduced above,and url is defined as an array,I can obtain predicates successfully. But I found another issue in my debugging process at the same time,when url is defined as string not an array,sql like this returned no results: {code:java} String s1_and = "select * from image_source " + "where url = 'aaa.jpg'" + "and url = 'bbb.jpg'"; {code} This is because ‘url‘ cannot be assigned two different values simultaneously. I think this is logical. But when I defined it as an array type, sql like this returned two records unexpectedly! {code:java} String s3_and = "select image, errorCode from image_source where " + "url = ARRAY['aaa.jpg', 'bbb.jpg'] and url = ARRAY['ccc.jpg', 'ddd.jpg']"; {code} I debugged and found that the task has entered the *PushFilterIntoTableSourceScanRule *of org. apache. Flink. table. planner. plan. rules. logical, but when url is a string,the task of “url = xxx and url = yyy” didn't enter 'onMatch' method of *PushFilterIntoTableSourceScanRule*. !screenshot-8.png! {code:java} // filter: rel#100:LogicalFilter.NONE.any.None: 0.[NONE].[NONE](input=HepRelVertex#99,condition= AND(=($0, CAST(ARRAY(_UTF-16LE'aaa.jpg', _UTF-16LE'bbb.jpg')):VARCHAR(2147483647) CHARACTER SET "UTF-16LE" ARRAY NOT NULL), =($0, CAST(ARRAY(_UTF-16LE'ccc.jpg', _UTF-16LE'ddd.jpg')):VARCHAR(2147483647) CHARACTER SET "UTF-16LE" ARRAY NOT NULL))) {code} I am confused how the frame work to verify that a field cannot be equal to two values at the same time. In which module is this processed?I think it need to be fixed. was (Author: JIRAUSER298666): Jark Wu Thank you for your reply!So should I submit a PR about this in github?Or the better way to fix this? When I fix the issue through the method I introduced above,and url is defined as an array,I can obtain predicates successfully. But I found another issue in my debugging process at the same time,when url is defined as string not an array,sql like this returned no results: {code:java} String s1_and = "select * from image_source " + "where url = 'aaa.jpg'" + "and url = 'bbb.jpg'"; {code} This is because ‘url‘ cannot be assigned two different values simultaneously. I think this is logical. But when I defined it as an array type, sql like this returned two records unexpectedly! {code:java} String s3_and = "select image, errorCode from image_source where " + "url = ARRAY['aaa.jpg', 'bbb.jpg'] and url = ARRAY['ccc.jpg', 'ddd.jpg']"; {code} I debugged and found that the task has entered the *PushFilterIntoTableSourceScanRule *of org. apache. Flink. table. planner. plan. rules. logical, but when url is a string,the task of “url = xxx and url = yyy” didn't enter 'onMatch' method of *PushFilterIntoTableSourceScanRule*. !screenshot-8.png! {code:java} // filter: rel#100:LogicalFilter.NONE.any.None: 0.[NONE].[NONE](input=HepRelVertex#99,condition= AND(=($0, CAST(ARRAY(_UTF-16LE'aaa.jpg', _UTF-16LE'bbb.jpg')):VARCHAR(2147483647) CHARACTER SET "UTF-16LE" ARRAY NOT NULL), =($0, CAST(ARRAY(_UTF-16LE'ccc.jpg', _UTF-16LE'ddd.jpg')):VARCHAR(2147483647) CHARACTER SET "UTF-16LE" ARRAY NOT NULL))) {code} I am confused how the frame work to verify that a field cannot be equal to two values at the same time. In which module is this processed?I think it need to be fixed. > Does the custom connector not support pushing down "where" query predicates > to query fields of array type? > -- > > Key: FLINK-32188 > URL: https://issues.apache.org/jira/browse/FLINK-32188 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.2, 1.17.0, 1.16.1 >Reporter: Xin Chen >Priority: Major > Attachments: image-2023-05-25-17-16-02-288.png, > image-2023-05-25-20-44-08-834.png, image-2023-05-25-20-44-47-581.png, > screenshot-1.png, screenshot-2.png, screenshot-3.png, screenshot-4.png, > screenshot-5.png, screenshot-6.png, screenshot-7.png, screenshot-8.png > > > When I customized a data source connector which assumed as image-connector, I > found that when creating a table with ddl, I specified a field URL as an > array type. When submitting an SQL task with Flink, I specified query this > field as a fixed array. For example, select * from image source where > URL=ARRAY ['/flink. jpg', '/flink_1.
[jira] [Updated] (FLINK-32188) Does the custom connector not support pushing down "where" query predicates to query fields of array type?
[ https://issues.apache.org/jira/browse/FLINK-32188?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xin Chen updated FLINK-32188: - Attachment: screenshot-8.png > Does the custom connector not support pushing down "where" query predicates > to query fields of array type? > -- > > Key: FLINK-32188 > URL: https://issues.apache.org/jira/browse/FLINK-32188 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.2, 1.17.0, 1.16.1 >Reporter: Xin Chen >Priority: Major > Attachments: image-2023-05-25-17-16-02-288.png, > image-2023-05-25-20-44-08-834.png, image-2023-05-25-20-44-47-581.png, > screenshot-1.png, screenshot-2.png, screenshot-3.png, screenshot-4.png, > screenshot-5.png, screenshot-6.png, screenshot-7.png, screenshot-8.png > > > When I customized a data source connector which assumed as image-connector, I > found that when creating a table with ddl, I specified a field URL as an > array type. When submitting an SQL task with Flink, I specified query this > field as a fixed array. For example, select * from image source where > URL=ARRAY ['/flink. jpg', '/flink_1. jpg'], but it couldn't obtain the > corresponding predicate filters at all. > Does the custom connector not support to query fields of array type with > "where"? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-connector-aws] hlteoh37 commented on a diff in pull request #49: [FLINK-24438] Add Kinesis connector using FLIP-27
hlteoh37 commented on code in PR #49: URL: https://github.com/apache/flink-connector-aws/pull/49#discussion_r1206683144 ## flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReader.java: ## @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.source.reader; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; +import org.apache.flink.connector.kinesis.source.proxy.StreamProxy; +import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit; +import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitState; +import org.apache.flink.connector.kinesis.source.split.StartingPosition; + +import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet; + +import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; +import software.amazon.awssdk.services.kinesis.model.Record; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Collections; +import java.util.Deque; +import java.util.Iterator; +import java.util.Set; + +/** + * An implementation of the SplitReader that periodically polls the Kinesis stream to retrieve + * records. + */ +@Internal +public class PollingKinesisShardSplitReader implements SplitReader { + +private static final RecordsWithSplitIds INCOMPLETE_SHARD_EMPTY_RECORDS = +new KinesisRecordsWithSplitIds(Collections.emptyIterator(), null, false); + +private final StreamProxy kinesis; +private final Deque assignedSplits = new ArrayDeque<>(); + +public PollingKinesisShardSplitReader(StreamProxy kinesisProxy) { +this.kinesis = kinesisProxy; +} + +@Override +public RecordsWithSplitIds fetch() throws IOException { +KinesisShardSplitState splitState = assignedSplits.poll(); +if (splitState == null) { +return INCOMPLETE_SHARD_EMPTY_RECORDS; +} + +GetRecordsResponse getRecordsResponse = +kinesis.getRecords( +splitState.getStreamArn(), +splitState.getShardId(), +splitState.getNextStartingPosition()); +boolean isComplete = getRecordsResponse.nextShardIterator() == null; + +if (hasNoRecords(getRecordsResponse)) { +if (isComplete) { +return new KinesisRecordsWithSplitIds( +Collections.emptyIterator(), splitState.splitId(), true); +} else { +assignedSplits.add(splitState); +return INCOMPLETE_SHARD_EMPTY_RECORDS; +} +} + +splitState.setNextStartingPosition( +StartingPosition.continueFromSequenceNumber( +getRecordsResponse +.records() +.get(getRecordsResponse.records().size() - 1) +.sequenceNumber())); + +assignedSplits.add(splitState); +return new KinesisRecordsWithSplitIds( +getRecordsResponse.records().iterator(), splitState.splitId(), isComplete); +} + +private boolean hasNoRecords(GetRecordsResponse getRecordsResponse) { +return !getRecordsResponse.hasRecords() || getRecordsResponse.records().isEmpty(); +} + +@Override +public void handleSplitsChanges(SplitsChange splitsChanges) { +for (KinesisShardSplit split : splitsChanges.splits()) { +assignedSplits.add(new KinesisShardSplitState(split)); +} +} + +@Override +public void wakeUp() { +// Do nothing because we don't have any sleep mechanism +} + +@Override +public void close() throws Exception {} Review Comment: Good catch. Fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to
[GitHub] [flink-connector-aws] hlteoh37 commented on a diff in pull request #49: [FLINK-24438] Add Kinesis connector using FLIP-27
hlteoh37 commented on code in PR #49: URL: https://github.com/apache/flink-connector-aws/pull/49#discussion_r1206671480 ## flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumerator.java: ## @@ -0,0 +1,338 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.source.enumerator; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.source.ReaderInfo; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.SplitsAssignment; +import org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.InitialPosition; +import org.apache.flink.connector.kinesis.source.enumerator.assigner.ShardAssignerFactory; +import org.apache.flink.connector.kinesis.source.exception.KinesisStreamsSourceException; +import org.apache.flink.connector.kinesis.source.model.CompletedShardsEvent; +import org.apache.flink.connector.kinesis.source.proxy.StreamProxy; +import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit; +import org.apache.flink.connector.kinesis.source.split.StartingPosition; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.kinesis.model.Shard; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +import static org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS; +import static org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.DEFAULT_STREAM_INITIAL_POSITION; +import static org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS; +import static org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.STREAM_INITIAL_POSITION; +import static org.apache.flink.connector.kinesis.source.config.SourceConfigUtil.parseStreamTimestampStartingPosition; + +/** + * This class is used to discover and assign Kinesis splits to subtasks on the Flink cluster. This + * runs on the JobManager. + */ +@Internal +public class KinesisStreamsSourceEnumerator +implements SplitEnumerator { + +private static final Logger LOG = LoggerFactory.getLogger(KinesisStreamsSourceEnumerator.class); + +private final SplitEnumeratorContext context; +private final String streamArn; +private final Properties consumerConfig; +private final StreamProxy streamProxy; +private final KinesisShardAssigner shardAssigner; +private final KinesisShardAssigner.Context shardAssignerContext; + +private final Map> splitAssignment = new HashMap<>(); +private final Set assignedSplitIds = new HashSet<>(); +private final Set unassignedSplits; +private final Set completedSplitIds; + +private String lastSeenShardId; + +public KinesisStreamsSourceEnumerator( +SplitEnumeratorContext context, +String streamArn, +Properties consumerConfig, +StreamProxy streamProxy, +KinesisStreamsSourceEnumeratorState state) { +this.context = context; +this.streamArn = streamArn; +this.consumerConfig = consumerConfig; +this.streamProxy = streamProxy; +this.shardAssigner = ShardAssignerFactory.uniformShardAssigner(); +this.shardAssignerContext = new ShardAssignerContext(splitAssignment, context); +if (state == null) { +this.completedSplitIds = new HashSet<>(); +this.lastSeenShardId = null; +this.unassignedSplits = new HashSet<>(); +} else { +this.completedSplitIds = state.getCompletedSplitIds(); +this.lastSeenShardId = state.getLastSeenShardId(); +
[GitHub] [flink-connector-aws] hlteoh37 commented on a diff in pull request #49: [FLINK-24438] Add Kinesis connector using FLIP-27
hlteoh37 commented on code in PR #49: URL: https://github.com/apache/flink-connector-aws/pull/49#discussion_r1206669438 ## flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/serialization/KinesisDeserializationSchemaWrapper.java: ## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.source.serialization; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.util.Collector; + +import software.amazon.awssdk.services.kinesis.model.Record; + +import java.io.IOException; + +/** + * A simple wrapper for using the {@link DeserializationSchema} with the {@link + * KinesisDeserializationSchema} interface. + * + * @param The type created by the deserialization schema. + */ +@Internal +class KinesisDeserializationSchemaWrapper implements KinesisDeserializationSchema { +private static final long serialVersionUID = 9143148962928375886L; + +private final DeserializationSchema deserializationSchema; + +KinesisDeserializationSchemaWrapper(DeserializationSchema deserializationSchema) { +this.deserializationSchema = deserializationSchema; +} + +@Override +public void open(DeserializationSchema.InitializationContext context) throws Exception { +this.deserializationSchema.open(context); +} + +@Override +public void deserialize(Record record, String stream, String shardId, Collector output) Review Comment: They are part of the `Record` datatype -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-aws] hlteoh37 commented on a diff in pull request #49: [FLINK-24438] Add Kinesis connector using FLIP-27
hlteoh37 commented on code in PR #49: URL: https://github.com/apache/flink-connector-aws/pull/49#discussion_r1206669002 ## flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/assigner/UniformShardAssigner.java: ## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.source.enumerator.assigner; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.kinesis.source.enumerator.KinesisShardAssigner; +import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit; +import org.apache.flink.util.Preconditions; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** An implementation of the {@link KinesisShardAssigner} that assigns splits uniformly. */ +@Internal +public class UniformShardAssigner implements KinesisShardAssigner { +@Override +public int assign(KinesisShardSplit split, Context context) { +int selectedSubtask = -1; +int curMinAssignment = Integer.MAX_VALUE; +Map> splitAssignment = context.getCurrentSplitAssignment(); +Map> pendingSplitAssignments = +context.getPendingSplitAssignments(); + +for (int subtaskId : context.getRegisteredReaders().keySet()) { +int subtaskAssignmentSize = +splitAssignment.getOrDefault(subtaskId, Collections.emptySet()).size() ++ pendingSplitAssignments +.getOrDefault(subtaskId, Collections.emptyList()) +.size(); +if (subtaskAssignmentSize < curMinAssignment) { +curMinAssignment = subtaskAssignmentSize; +selectedSubtask = subtaskId; +} +} Review Comment: Ok added. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-aws] hlteoh37 commented on a diff in pull request #49: [FLINK-24438] Add Kinesis connector using FLIP-27
hlteoh37 commented on code in PR #49: URL: https://github.com/apache/flink-connector-aws/pull/49#discussion_r120406 ## flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumerator.java: ## @@ -0,0 +1,347 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.source.enumerator; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.source.ReaderInfo; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.SplitsAssignment; +import org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.InitialPosition; +import org.apache.flink.connector.kinesis.source.exception.KinesisStreamsSourceException; +import org.apache.flink.connector.kinesis.source.proxy.StreamProxy; +import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit; +import org.apache.flink.connector.kinesis.source.split.StartingPosition; + +import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList; +import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap; +import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.kinesis.model.Shard; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.Set; + +import static org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS; +import static org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.DEFAULT_STREAM_INITIAL_POSITION; +import static org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS; +import static org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.STREAM_INITIAL_POSITION; +import static org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigUtil.parseStreamTimestampStartingPosition; + +/** + * This class is used to discover and assign Kinesis splits to subtasks on the Flink cluster. This + * runs on the JobManager. + */ +@Internal +public class KinesisStreamsSourceEnumerator +implements SplitEnumerator { + +private static final Logger LOG = LoggerFactory.getLogger(KinesisStreamsSourceEnumerator.class); + +private final SplitEnumeratorContext context; +private final String streamArn; +private final Properties consumerConfig; +private final StreamProxy streamProxy; +private final KinesisShardAssigner shardAssigner; +private final ShardAssignerContext shardAssignerContext; + +private final Map> splitAssignment = new HashMap<>(); +private final Set assignedSplitIds = new HashSet<>(); +private final Set unassignedSplits; + +private String lastSeenShardId; + +public KinesisStreamsSourceEnumerator( +SplitEnumeratorContext context, +String streamArn, +Properties consumerConfig, +StreamProxy streamProxy, +KinesisShardAssigner shardAssigner, +KinesisStreamsSourceEnumeratorState state) { +this.context = context; +this.streamArn = streamArn; +this.consumerConfig = consumerConfig; +this.streamProxy = streamProxy; +this.shardAssigner = shardAssigner; +this.shardAssignerContext = new ShardAssignerContext(splitAssignment, context); +if (state == null) { +this.lastSeenShardId = null; +this.unassignedSplits = new HashSet<>(); +} else { +this.lastSeenShardId = state.getLastSeenShardId(); +this.unassignedSplits =
[GitHub] [flink-connector-aws] hlteoh37 commented on a diff in pull request #49: [FLINK-24438] Add Kinesis connector using FLIP-27
hlteoh37 commented on code in PR #49: URL: https://github.com/apache/flink-connector-aws/pull/49#discussion_r1206663759 ## flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumerator.java: ## @@ -0,0 +1,347 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.source.enumerator; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.source.ReaderInfo; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.SplitsAssignment; +import org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.InitialPosition; +import org.apache.flink.connector.kinesis.source.exception.KinesisStreamsSourceException; +import org.apache.flink.connector.kinesis.source.proxy.StreamProxy; +import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit; +import org.apache.flink.connector.kinesis.source.split.StartingPosition; + +import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList; +import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap; +import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.kinesis.model.Shard; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.Set; + +import static org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS; +import static org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.DEFAULT_STREAM_INITIAL_POSITION; +import static org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS; +import static org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.STREAM_INITIAL_POSITION; +import static org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigUtil.parseStreamTimestampStartingPosition; + +/** + * This class is used to discover and assign Kinesis splits to subtasks on the Flink cluster. This + * runs on the JobManager. + */ +@Internal +public class KinesisStreamsSourceEnumerator +implements SplitEnumerator { + +private static final Logger LOG = LoggerFactory.getLogger(KinesisStreamsSourceEnumerator.class); + +private final SplitEnumeratorContext context; +private final String streamArn; +private final Properties consumerConfig; +private final StreamProxy streamProxy; +private final KinesisShardAssigner shardAssigner; +private final ShardAssignerContext shardAssignerContext; + +private final Map> splitAssignment = new HashMap<>(); +private final Set assignedSplitIds = new HashSet<>(); +private final Set unassignedSplits; + +private String lastSeenShardId; + +public KinesisStreamsSourceEnumerator( +SplitEnumeratorContext context, +String streamArn, +Properties consumerConfig, +StreamProxy streamProxy, +KinesisShardAssigner shardAssigner, +KinesisStreamsSourceEnumeratorState state) { +this.context = context; +this.streamArn = streamArn; +this.consumerConfig = consumerConfig; +this.streamProxy = streamProxy; +this.shardAssigner = shardAssigner; +this.shardAssignerContext = new ShardAssignerContext(splitAssignment, context); +if (state == null) { +this.lastSeenShardId = null; +this.unassignedSplits = new HashSet<>(); +} else { +this.lastSeenShardId = state.getLastSeenShardId(); +this.unassignedSplits =
[GitHub] [flink-connector-aws] hlteoh37 commented on a diff in pull request #49: [FLINK-24438] Add Kinesis connector using FLIP-27
hlteoh37 commented on code in PR #49: URL: https://github.com/apache/flink-connector-aws/pull/49#discussion_r1206665143 ## flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumerator.java: ## @@ -0,0 +1,347 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.source.enumerator; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.source.ReaderInfo; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.SplitsAssignment; +import org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.InitialPosition; +import org.apache.flink.connector.kinesis.source.exception.KinesisStreamsSourceException; +import org.apache.flink.connector.kinesis.source.proxy.StreamProxy; +import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit; +import org.apache.flink.connector.kinesis.source.split.StartingPosition; + +import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList; +import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap; +import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.kinesis.model.Shard; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.Set; + +import static org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS; +import static org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.DEFAULT_STREAM_INITIAL_POSITION; +import static org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS; +import static org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.STREAM_INITIAL_POSITION; +import static org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigUtil.parseStreamTimestampStartingPosition; + +/** + * This class is used to discover and assign Kinesis splits to subtasks on the Flink cluster. This + * runs on the JobManager. + */ +@Internal +public class KinesisStreamsSourceEnumerator +implements SplitEnumerator { + +private static final Logger LOG = LoggerFactory.getLogger(KinesisStreamsSourceEnumerator.class); + +private final SplitEnumeratorContext context; +private final String streamArn; +private final Properties consumerConfig; +private final StreamProxy streamProxy; +private final KinesisShardAssigner shardAssigner; +private final ShardAssignerContext shardAssignerContext; + +private final Map> splitAssignment = new HashMap<>(); +private final Set assignedSplitIds = new HashSet<>(); +private final Set unassignedSplits; + +private String lastSeenShardId; + +public KinesisStreamsSourceEnumerator( +SplitEnumeratorContext context, +String streamArn, +Properties consumerConfig, +StreamProxy streamProxy, +KinesisShardAssigner shardAssigner, +KinesisStreamsSourceEnumeratorState state) { +this.context = context; +this.streamArn = streamArn; +this.consumerConfig = consumerConfig; +this.streamProxy = streamProxy; +this.shardAssigner = shardAssigner; +this.shardAssignerContext = new ShardAssignerContext(splitAssignment, context); +if (state == null) { +this.lastSeenShardId = null; +this.unassignedSplits = new HashSet<>(); +} else { +this.lastSeenShardId = state.getLastSeenShardId(); +this.unassignedSplits =
[GitHub] [flink-connector-aws] hlteoh37 commented on a diff in pull request #49: [FLINK-24438] Add Kinesis connector using FLIP-27
hlteoh37 commented on code in PR #49: URL: https://github.com/apache/flink-connector-aws/pull/49#discussion_r1206664219 ## flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumerator.java: ## @@ -0,0 +1,347 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.source.enumerator; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.source.ReaderInfo; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.SplitsAssignment; +import org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.InitialPosition; +import org.apache.flink.connector.kinesis.source.exception.KinesisStreamsSourceException; +import org.apache.flink.connector.kinesis.source.proxy.StreamProxy; +import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit; +import org.apache.flink.connector.kinesis.source.split.StartingPosition; + +import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList; +import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap; +import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.kinesis.model.Shard; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.Set; + +import static org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS; +import static org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.DEFAULT_STREAM_INITIAL_POSITION; +import static org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS; +import static org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.STREAM_INITIAL_POSITION; +import static org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigUtil.parseStreamTimestampStartingPosition; + +/** + * This class is used to discover and assign Kinesis splits to subtasks on the Flink cluster. This + * runs on the JobManager. + */ +@Internal +public class KinesisStreamsSourceEnumerator +implements SplitEnumerator { + +private static final Logger LOG = LoggerFactory.getLogger(KinesisStreamsSourceEnumerator.class); + +private final SplitEnumeratorContext context; +private final String streamArn; +private final Properties consumerConfig; +private final StreamProxy streamProxy; +private final KinesisShardAssigner shardAssigner; +private final ShardAssignerContext shardAssignerContext; + +private final Map> splitAssignment = new HashMap<>(); +private final Set assignedSplitIds = new HashSet<>(); +private final Set unassignedSplits; + +private String lastSeenShardId; + +public KinesisStreamsSourceEnumerator( +SplitEnumeratorContext context, +String streamArn, +Properties consumerConfig, +StreamProxy streamProxy, +KinesisShardAssigner shardAssigner, +KinesisStreamsSourceEnumeratorState state) { +this.context = context; +this.streamArn = streamArn; +this.consumerConfig = consumerConfig; +this.streamProxy = streamProxy; +this.shardAssigner = shardAssigner; +this.shardAssignerContext = new ShardAssignerContext(splitAssignment, context); +if (state == null) { +this.lastSeenShardId = null; +this.unassignedSplits = new HashSet<>(); +} else { +this.lastSeenShardId = state.getLastSeenShardId(); +this.unassignedSplits =
[GitHub] [flink-connector-aws] hlteoh37 commented on a diff in pull request #49: [FLINK-24438] Add Kinesis connector using FLIP-27
hlteoh37 commented on code in PR #49: URL: https://github.com/apache/flink-connector-aws/pull/49#discussion_r1206662694 ## flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumerator.java: ## @@ -0,0 +1,347 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.source.enumerator; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.source.ReaderInfo; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.SplitsAssignment; +import org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.InitialPosition; +import org.apache.flink.connector.kinesis.source.exception.KinesisStreamsSourceException; +import org.apache.flink.connector.kinesis.source.proxy.StreamProxy; +import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit; +import org.apache.flink.connector.kinesis.source.split.StartingPosition; + +import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList; +import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap; +import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.kinesis.model.Shard; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.Set; + +import static org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS; +import static org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.DEFAULT_STREAM_INITIAL_POSITION; +import static org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS; +import static org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.STREAM_INITIAL_POSITION; +import static org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigUtil.parseStreamTimestampStartingPosition; + +/** + * This class is used to discover and assign Kinesis splits to subtasks on the Flink cluster. This + * runs on the JobManager. + */ +@Internal +public class KinesisStreamsSourceEnumerator +implements SplitEnumerator { + +private static final Logger LOG = LoggerFactory.getLogger(KinesisStreamsSourceEnumerator.class); + +private final SplitEnumeratorContext context; +private final String streamArn; +private final Properties consumerConfig; +private final StreamProxy streamProxy; +private final KinesisShardAssigner shardAssigner; +private final ShardAssignerContext shardAssignerContext; + +private final Map> splitAssignment = new HashMap<>(); +private final Set assignedSplitIds = new HashSet<>(); +private final Set unassignedSplits; + +private String lastSeenShardId; + +public KinesisStreamsSourceEnumerator( +SplitEnumeratorContext context, +String streamArn, +Properties consumerConfig, +StreamProxy streamProxy, +KinesisShardAssigner shardAssigner, +KinesisStreamsSourceEnumeratorState state) { +this.context = context; +this.streamArn = streamArn; +this.consumerConfig = consumerConfig; +this.streamProxy = streamProxy; +this.shardAssigner = shardAssigner; +this.shardAssignerContext = new ShardAssignerContext(splitAssignment, context); +if (state == null) { +this.lastSeenShardId = null; +this.unassignedSplits = new HashSet<>(); +} else { +this.lastSeenShardId = state.getLastSeenShardId(); +this.unassignedSplits =
[GitHub] [flink-connector-aws] hlteoh37 commented on a diff in pull request #49: [FLINK-24438] Add Kinesis connector using FLIP-27
hlteoh37 commented on code in PR #49: URL: https://github.com/apache/flink-connector-aws/pull/49#discussion_r1206657247 ## flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/config/KinesisStreamsSourceConfigUtil.java: ## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.source.config; + +import org.apache.flink.annotation.Internal; + +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Properties; + +import static org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT; +import static org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.STREAM_INITIAL_TIMESTAMP; +import static org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT; + +/** Utility functions to use with {@link KinesisStreamsSourceConfigConstants}. */ +@Internal +public class KinesisStreamsSourceConfigUtil { + +private KinesisStreamsSourceConfigUtil() { +// private constructor to prevent initialization of utility class. +} + +/** + * Parses the timestamp in which to start consuming from the stream, from the given properties. + * + * @param consumerConfig the properties to parse timestamp from + * @return the timestamp + */ +public static Date parseStreamTimestampStartingPosition(final Properties consumerConfig) { +String timestamp = consumerConfig.getProperty(STREAM_INITIAL_TIMESTAMP); Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-32188) Does the custom connector not support pushing down "where" query predicates to query fields of array type?
[ https://issues.apache.org/jira/browse/FLINK-32188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726598#comment-17726598 ] Jark Wu commented on FLINK-32188: - But I'm also confused why the array constructor is not evaluated into literal before pushing down. > Does the custom connector not support pushing down "where" query predicates > to query fields of array type? > -- > > Key: FLINK-32188 > URL: https://issues.apache.org/jira/browse/FLINK-32188 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.2, 1.17.0, 1.16.1 >Reporter: Xin Chen >Priority: Major > Attachments: image-2023-05-25-17-16-02-288.png, > image-2023-05-25-20-44-08-834.png, image-2023-05-25-20-44-47-581.png, > screenshot-1.png, screenshot-2.png, screenshot-3.png, screenshot-4.png, > screenshot-5.png, screenshot-6.png, screenshot-7.png > > > When I customized a data source connector which assumed as image-connector, I > found that when creating a table with ddl, I specified a field URL as an > array type. When submitting an SQL task with Flink, I specified query this > field as a fixed array. For example, select * from image source where > URL=ARRAY ['/flink. jpg', '/flink_1. jpg'], but it couldn't obtain the > corresponding predicate filters at all. > Does the custom connector not support to query fields of array type with > "where"? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-connector-aws] hlteoh37 commented on a diff in pull request #49: [FLINK-24438] Add Kinesis connector using FLIP-27
hlteoh37 commented on code in PR #49: URL: https://github.com/apache/flink-connector-aws/pull/49#discussion_r1206656720 ## flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSourceBuilder.java: ## @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.source; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.connector.kinesis.source.enumerator.KinesisShardAssigner; +import org.apache.flink.connector.kinesis.source.enumerator.assigner.ShardAssignerFactory; +import org.apache.flink.connector.kinesis.source.enumerator.assigner.UniformShardAssigner; +import org.apache.flink.connector.kinesis.source.serialization.KinesisDeserializationSchema; + +import java.util.Properties; + +/** + * Builder to construct the {@link KinesisStreamsSource}. + * + * The following example shows the minimum setup to create a {@link KinesisStreamsSource} that + * reads String values from a Kinesis Data Streams stream with ARN of + * arn:aws:kinesis:us-east-1:012345678901:stream/your_stream_name. + * + * {@code + * KinesisStreamsSource kdsSource = + * KinesisStreamsSource.builder() + * .setStreamArn("arn:aws:kinesis:us-east-1:012345678901:stream/your_stream_name") + * .setDeserializationSchema(new SimpleStringSchema()) + * .build(); + * } + * + * If the following parameters are not set in this builder, the following defaults will be used: + * + * + * {@code kinesisShardAssigner} will be {@link UniformShardAssigner} + * + * + * @param type of elements that should be read from the source stream + */ +@Experimental +public class KinesisStreamsSourceBuilder { +private String streamArn; +private Properties consumerConfig = new Properties(); Review Comment: Ok done. We can add feature to parse region from the ARN in a followup -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-31990) Use Flink Configuration to specify KDS Source configuration object
[ https://issues.apache.org/jira/browse/FLINK-31990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hong Liang Teoh updated FLINK-31990: Description: *What* Use the Flink Configuration object to standardise the method of specifying configurations for the KDS source. Also include validations: - Check that region in config matches ARN. ARN should take priority. *Why* We want to standardise error messages + source serialization methods implemented by Flink on the Flink Configuration objects. was: *What* Use the Flink Configuration object to standardise the method of specifying configurations for the KDS source. *Why* We want to standardise error messages + source serialization methods implemented by Flink on the Flink Configuration objects. > Use Flink Configuration to specify KDS Source configuration object > -- > > Key: FLINK-31990 > URL: https://issues.apache.org/jira/browse/FLINK-31990 > Project: Flink > Issue Type: Sub-task >Reporter: Hong Liang Teoh >Priority: Major > > *What* > Use the Flink Configuration object to standardise the method of specifying > configurations for the KDS source. > > Also include validations: > - Check that region in config matches ARN. ARN should take priority. > > *Why* > We want to standardise error messages + source serialization methods > implemented by Flink on the Flink Configuration objects. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-connector-aws] hlteoh37 commented on a diff in pull request #49: [FLINK-24438] Add Kinesis connector using FLIP-27
hlteoh37 commented on code in PR #49: URL: https://github.com/apache/flink-connector-aws/pull/49#discussion_r1206655247 ## flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/config/KinesisStreamsSourceConfigUtil.java: ## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.source.config; + +import org.apache.flink.annotation.Internal; + +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Properties; + +import static org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT; +import static org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.STREAM_INITIAL_TIMESTAMP; +import static org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT; + +/** Utility functions to use with {@link KinesisStreamsSourceConfigConstants}. */ +@Internal +public class KinesisStreamsSourceConfigUtil { Review Comment: Yes. I have added this to https://issues.apache.org/jira/browse/FLINK-31990 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-32188) Does the custom connector not support pushing down "where" query predicates to query fields of array type?
[ https://issues.apache.org/jira/browse/FLINK-32188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726597#comment-17726597 ] Jark Wu commented on FLINK-32188: - Yes, I think your fix is fine. The function name "array" in BuiltInFunctionDefinition is not equal to "ARRAY_VALUE_CONSTRUCTOR", I think that's why we can't find the function via {{lookupFunction}}. > Does the custom connector not support pushing down "where" query predicates > to query fields of array type? > -- > > Key: FLINK-32188 > URL: https://issues.apache.org/jira/browse/FLINK-32188 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.2, 1.17.0, 1.16.1 >Reporter: Xin Chen >Priority: Major > Attachments: image-2023-05-25-17-16-02-288.png, > image-2023-05-25-20-44-08-834.png, image-2023-05-25-20-44-47-581.png, > screenshot-1.png, screenshot-2.png, screenshot-3.png, screenshot-4.png, > screenshot-5.png, screenshot-6.png, screenshot-7.png > > > When I customized a data source connector which assumed as image-connector, I > found that when creating a table with ddl, I specified a field URL as an > array type. When submitting an SQL task with Flink, I specified query this > field as a fixed array. For example, select * from image source where > URL=ARRAY ['/flink. jpg', '/flink_1. jpg'], but it couldn't obtain the > corresponding predicate filters at all. > Does the custom connector not support to query fields of array type with > "where"? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32176) [CVE-2022-1471] Mitigate CVE from snakeyaml coming from pulsar-client-all
[ https://issues.apache.org/jira/browse/FLINK-32176?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-32176: --- Labels: pull-request-available (was: ) > [CVE-2022-1471] Mitigate CVE from snakeyaml coming from pulsar-client-all > - > > Key: FLINK-32176 > URL: https://issues.apache.org/jira/browse/FLINK-32176 > Project: Flink > Issue Type: Bug > Components: Connectors / Pulsar >Affects Versions: 1.17.0 >Reporter: Samrat Deb >Assignee: Samrat Deb >Priority: Major > Labels: pull-request-available > > > * *CVE ID:* {{CVE-2022-1471}} > * *CWE:* CWE-502 Deserialization of Untrusted Data > * {*}Severity{*}: Critical > {{pulsar-client-all-2.10.0.jar (shaded: org.yaml:snakeyaml:1.30)}} > > {{SnakeYaml's Constructor() class does not restrict types which can be > instantiated during deserialization. Deserializing yaml content provided by > an attacker can lead to remote code execution. We recommend using SnakeYaml's > SafeConsturctor when parsing untrusted content to restrict deserialization.}} > {{}} > {{More details : https://nvd.nist.gov/vuln/detail/CVE-2022-1471}} > {{{}{}}}{{{}{}}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-connector-pulsar] Samrat002 opened a new pull request, #51: [FLINK-32176] Exclude snakeyaml from pulsar-client-all
Samrat002 opened a new pull request, #51: URL: https://github.com/apache/flink-connector-pulsar/pull/51 ## Purpose of the change Mitigate the imapact of CVE in flink ## Brief change log Exclude snakeyaml from pulsar-client-all ## Verifying this change Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added unit tests* - *Added integration tests for end-to-end deployment* - *Manually verified by running the Pulsar connector on a local Flink cluster.* ## Significant changes *(Please check any boxes [x] if the answer is "yes". You can first publish the PR and check them afterwards, for convenience.)* - [ ] Dependencies have been added or upgraded - [ ] Public API has been changed (Public API is any class annotated with `@Public(Evolving)`) - [ ] Serializers have been changed - [ ] New feature has been introduced - If yes, how is this documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-32008) Protobuf format cannot work with FileSystem Connector
[ https://issues.apache.org/jira/browse/FLINK-32008?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Benchao Li updated FLINK-32008: --- Summary: Protobuf format cannot work with FileSystem Connector (was: Protobuf format throws exception with Map datatype) > Protobuf format cannot work with FileSystem Connector > - > > Key: FLINK-32008 > URL: https://issues.apache.org/jira/browse/FLINK-32008 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.17.0 >Reporter: Xuannan Su >Priority: Major > Attachments: flink-protobuf-example.zip > > > The protobuf format throws exception when working with Map data type. I > uploaded a example project to reproduce the problem. > > {code:java} > Caused by: java.lang.RuntimeException: One or more fetchers have encountered > exception > at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:261) > at > org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169) > at > org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:131) > at > org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:417) > at > org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received > unexpected exception while polling the records > at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165) > at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:114) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > ... 1 more > Caused by: java.io.IOException: Failed to deserialize PB object. > at > org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.deserialize(PbRowDataDeserializationSchema.java:75) > at > org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.deserialize(PbRowDataDeserializationSchema.java:42) > at > org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82) > at > org.apache.flink.connector.file.table.DeserializationSchemaAdapter$LineBytesInputFormat.readRecord(DeserializationSchemaAdapter.java:197) > at > org.apache.flink.connector.file.table.DeserializationSchemaAdapter$LineBytesInputFormat.nextRecord(DeserializationSchemaAdapter.java:210) > at > org.apache.flink.connector.file.table.DeserializationSchemaAdapter$Reader.readBatch(DeserializationSchemaAdapter.java:124) > at > org.apache.flink.connector.file.src.util.RecordMapperWrapperRecordIterator$1.readBatch(RecordMapperWrapperRecordIterator.java:82) > at > org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:67) > at > org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) > at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:162) > ... 6 more > Caused by: java.lang.reflect.InvocationTargetException > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at >
[jira] [Commented] (FLINK-32008) Protobuf format cannot work with FileSystem Connector
[ https://issues.apache.org/jira/browse/FLINK-32008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726583#comment-17726583 ] Benchao Li commented on FLINK-32008: I've changed the title. > Protobuf format cannot work with FileSystem Connector > - > > Key: FLINK-32008 > URL: https://issues.apache.org/jira/browse/FLINK-32008 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.17.0 >Reporter: Xuannan Su >Priority: Major > Attachments: flink-protobuf-example.zip > > > The protobuf format throws exception when working with Map data type. I > uploaded a example project to reproduce the problem. > > {code:java} > Caused by: java.lang.RuntimeException: One or more fetchers have encountered > exception > at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:261) > at > org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169) > at > org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:131) > at > org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:417) > at > org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received > unexpected exception while polling the records > at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165) > at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:114) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > ... 1 more > Caused by: java.io.IOException: Failed to deserialize PB object. > at > org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.deserialize(PbRowDataDeserializationSchema.java:75) > at > org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.deserialize(PbRowDataDeserializationSchema.java:42) > at > org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82) > at > org.apache.flink.connector.file.table.DeserializationSchemaAdapter$LineBytesInputFormat.readRecord(DeserializationSchemaAdapter.java:197) > at > org.apache.flink.connector.file.table.DeserializationSchemaAdapter$LineBytesInputFormat.nextRecord(DeserializationSchemaAdapter.java:210) > at > org.apache.flink.connector.file.table.DeserializationSchemaAdapter$Reader.readBatch(DeserializationSchemaAdapter.java:124) > at > org.apache.flink.connector.file.src.util.RecordMapperWrapperRecordIterator$1.readBatch(RecordMapperWrapperRecordIterator.java:82) > at > org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:67) > at > org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) > at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:162) > ... 6 more > Caused by: java.lang.reflect.InvocationTargetException > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at >
[jira] [Commented] (FLINK-32204) ZooKeeperLeaderElectionTest.testZooKeeperReelectionWithReplacement fails with The ExecutorService is shut down already. No Callables can be executed on AZP
[ https://issues.apache.org/jira/browse/FLINK-32204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726580#comment-17726580 ] Matthias Pohl commented on FLINK-32204: --- It's most likely being caused by FLINK-31995. The {{ZooKeeperLeaderElectionDriver}} uses a {{DirectExecutorService}} in for the {{TreeCache}}. The {{RejectedExecutionException}} handling was added in FLINK-31995. So, it could be that it reveals a bug in some other code which was just not visible before. > ZooKeeperLeaderElectionTest.testZooKeeperReelectionWithReplacement fails with > The ExecutorService is shut down already. No Callables can be executed on AZP > --- > > Key: FLINK-32204 > URL: https://issues.apache.org/jira/browse/FLINK-32204 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.18.0 >Reporter: Sergey Nuyanzin >Assignee: Matthias Pohl >Priority: Critical > Labels: test-stability > > [This > build|https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49386=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=7095] > fails as > {noformat} > May 25 18:45:50 Caused by: java.util.concurrent.RejectedExecutionException: > The ExecutorService is shut down already. No Callables can be executed. > May 25 18:45:50 at > org.apache.flink.util.concurrent.DirectExecutorService.throwRejectedExecutionExceptionIfShutdown(DirectExecutorService.java:237) > May 25 18:45:50 at > org.apache.flink.util.concurrent.DirectExecutorService.submit(DirectExecutorService.java:100) > May 25 18:45:50 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.TreeCache.publishEvent(TreeCache.java:902) > May 25 18:45:50 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.TreeCache.publishEvent(TreeCache.java:894) > May 25 18:45:50 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.TreeCache.access$1200(TreeCache.java:79) > May 25 18:45:50 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.TreeCache$TreeNode.processResult(TreeCache.java:489) > May 25 18:45:50 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.CuratorFrameworkImpl.sendToBackgroundCallback(CuratorFrameworkImpl.java:926) > May 25 18:45:50 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.CuratorFrameworkImpl.processBackgroundOperation(CuratorFrameworkImpl.java:683) > May 25 18:45:50 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.WatcherRemovalFacade.processBackgroundOperation(WatcherRemovalFacade.java:152) > May 25 18:45:50 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.GetDataBuilderImpl$3.processResult(GetDataBuilderImpl.java:272) > May 25 18:45:50 at > org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:634) > May 25 18:45:50 at > org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:553) > May 25 18:45:50 > {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] zentol commented on pull request #22660: [FLINK-3154][runtime] Upgrade from Kryo v2 + Chill 0.7.6 to Kryo v5 w…
zentol commented on PR #22660: URL: https://github.com/apache/flink/pull/22660#issuecomment-1564201206 I appreciate the work, but really this change has to go through a proper design process. There are open questions as to whether we really want to expose Kryo in the same way we did before (because the presence of Kryo serializers in the execution config is a problem in general). Please create a [FLIP](https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals) and publish it on the dev mailing list. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #22666: [Just For Test] Update japicmp configuration for 1.16.2
flinkbot commented on PR #22666: URL: https://github.com/apache/flink/pull/22666#issuecomment-1564193372 ## CI report: * f978740193f6053e57003801e870b91b398a3d18 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #22665: [Just For Test] Update japicmp configuration for 1.17.1
flinkbot commented on PR #22665: URL: https://github.com/apache/flink/pull/22665#issuecomment-1564188090 ## CI report: * 5ac854d04e1f587da5f70ea96326bb4d1d674497 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-web] reswqa merged pull request #650: Add Flink 1.17.1
reswqa merged PR #650: URL: https://github.com/apache/flink-web/pull/650 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-web] reswqa merged pull request #649: Add Flink 1.16.2
reswqa merged PR #649: URL: https://github.com/apache/flink-web/pull/649 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-32008) Protobuf format throws exception with Map datatype
[ https://issues.apache.org/jira/browse/FLINK-32008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726567#comment-17726567 ] Ryan Skraba commented on FLINK-32008: - It's probably worth checking in with the community to see what we expect of a bulk format, or if it would be an interesting thing to add! I'll ask on the mailing list. I took a quick look but didn't see any related JIRA (outside of FLINK-12149, which proposes using the protobuf API to interact with parquet files). Can a committer change the title of this JIRA to better reflect the issue? Something like "Protobuf format on filesystem is faulty" > Protobuf format throws exception with Map datatype > -- > > Key: FLINK-32008 > URL: https://issues.apache.org/jira/browse/FLINK-32008 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.17.0 >Reporter: Xuannan Su >Priority: Major > Attachments: flink-protobuf-example.zip > > > The protobuf format throws exception when working with Map data type. I > uploaded a example project to reproduce the problem. > > {code:java} > Caused by: java.lang.RuntimeException: One or more fetchers have encountered > exception > at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:261) > at > org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169) > at > org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:131) > at > org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:417) > at > org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received > unexpected exception while polling the records > at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165) > at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:114) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > ... 1 more > Caused by: java.io.IOException: Failed to deserialize PB object. > at > org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.deserialize(PbRowDataDeserializationSchema.java:75) > at > org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.deserialize(PbRowDataDeserializationSchema.java:42) > at > org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82) > at > org.apache.flink.connector.file.table.DeserializationSchemaAdapter$LineBytesInputFormat.readRecord(DeserializationSchemaAdapter.java:197) > at > org.apache.flink.connector.file.table.DeserializationSchemaAdapter$LineBytesInputFormat.nextRecord(DeserializationSchemaAdapter.java:210) > at > org.apache.flink.connector.file.table.DeserializationSchemaAdapter$Reader.readBatch(DeserializationSchemaAdapter.java:124) > at > org.apache.flink.connector.file.src.util.RecordMapperWrapperRecordIterator$1.readBatch(RecordMapperWrapperRecordIterator.java:82) > at > org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:67) > at > org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) > at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:162) > ... 6 more > Caused by:
[jira] [Commented] (FLINK-32202) useless configuration
[ https://issues.apache.org/jira/browse/FLINK-32202?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726566#comment-17726566 ] Weihua Hu commented on FLINK-32202: --- [~zhangdong7] Could you explain more about "but the parameter query.server.ports:6125 will be generated when Flink starts"? I couldn't find the use of "query.server.ports" in the master branch. > useless configuration > - > > Key: FLINK-32202 > URL: https://issues.apache.org/jira/browse/FLINK-32202 > Project: Flink > Issue Type: Improvement > Components: Runtime / Queryable State >Affects Versions: 1.15.4 >Reporter: zhangdong7 >Priority: Minor > > According to the official Flink documentation, the parameter > query.server.ports has been replaced by queryable-state.server.ports, but the > parameter query.server.ports:6125 will be generated when Flink starts. Is > this a historical problem? > > {code:java} > public static final ConfigOption SERVER_PORT_RANGE = > ConfigOptions.key("queryable-state.server.ports").stringType().defaultValue("9067").withDescription("The > port range of the queryable state server. The specified range can be a > single port: \"9123\", a range of ports: \"50100-50200\", or a list of ranges > and ports: \"50100-50200,50300-50400,51234\".").withDeprecatedKeys(new > String[]{"query.server.ports"});{code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] (FLINK-32202) useless configuration
[ https://issues.apache.org/jira/browse/FLINK-32202 ] Weihua Hu deleted comment on FLINK-32202: --- was (Author: huwh): [~zhangdong7] Could you explain more about "but the parameter query.server.ports:6125 will be generated when Flink starts"? I couldn't find the use of "query.server.ports" in the master branch. > useless configuration > - > > Key: FLINK-32202 > URL: https://issues.apache.org/jira/browse/FLINK-32202 > Project: Flink > Issue Type: Improvement > Components: Runtime / Queryable State >Affects Versions: 1.15.4 >Reporter: zhangdong7 >Priority: Minor > > According to the official Flink documentation, the parameter > query.server.ports has been replaced by queryable-state.server.ports, but the > parameter query.server.ports:6125 will be generated when Flink starts. Is > this a historical problem? > > {code:java} > public static final ConfigOption SERVER_PORT_RANGE = > ConfigOptions.key("queryable-state.server.ports").stringType().defaultValue("9067").withDescription("The > port range of the queryable state server. The specified range can be a > single port: \"9123\", a range of ports: \"50100-50200\", or a list of ranges > and ports: \"50100-50200,50300-50400,51234\".").withDeprecatedKeys(new > String[]{"query.server.ports"});{code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32202) useless configuration
[ https://issues.apache.org/jira/browse/FLINK-32202?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser updated FLINK-32202: --- Component/s: Runtime / Queryable State (was: API / Core) > useless configuration > - > > Key: FLINK-32202 > URL: https://issues.apache.org/jira/browse/FLINK-32202 > Project: Flink > Issue Type: Improvement > Components: Runtime / Queryable State >Affects Versions: 1.15.4 >Reporter: zhangdong7 >Priority: Minor > > According to the official Flink documentation, the parameter > query.server.ports has been replaced by queryable-state.server.ports, but the > parameter query.server.ports:6125 will be generated when Flink starts. Is > this a historical problem? > > {code:java} > public static final ConfigOption SERVER_PORT_RANGE = > ConfigOptions.key("queryable-state.server.ports").stringType().defaultValue("9067").withDescription("The > port range of the queryable state server. The specified range can be a > single port: \"9123\", a range of ports: \"50100-50200\", or a list of ranges > and ports: \"50100-50200,50300-50400,51234\".").withDeprecatedKeys(new > String[]{"query.server.ports"});{code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-32202) useless configuration
[ https://issues.apache.org/jira/browse/FLINK-32202?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser closed FLINK-32202. -- Resolution: Invalid Queryable State is (soft) deprecated, so this should doesn't have to be fixed at this moment > useless configuration > - > > Key: FLINK-32202 > URL: https://issues.apache.org/jira/browse/FLINK-32202 > Project: Flink > Issue Type: Improvement > Components: API / Core >Affects Versions: 1.15.4 >Reporter: zhangdong7 >Priority: Minor > > According to the official Flink documentation, the parameter > query.server.ports has been replaced by queryable-state.server.ports, but the > parameter query.server.ports:6125 will be generated when Flink starts. Is > this a historical problem? > > {code:java} > public static final ConfigOption SERVER_PORT_RANGE = > ConfigOptions.key("queryable-state.server.ports").stringType().defaultValue("9067").withDescription("The > port range of the queryable state server. The specified range can be a > single port: \"9123\", a range of ports: \"50100-50200\", or a list of ranges > and ports: \"50100-50200,50300-50400,51234\".").withDeprecatedKeys(new > String[]{"query.server.ports"});{code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] MartijnVisser commented on pull request #22664: [FLINK-32203] Remove monitoringInterval to reduce probability of ClassLoader memory leak
MartijnVisser commented on PR #22664: URL: https://github.com/apache/flink/pull/22664#issuecomment-1564121721 @JTaky Thanks for the PR. However, would it be possible to actually address the issue of the memory leak, instead of applying the mitigating measure that this PR introduces? Also curious on @zentol his opinion for this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-29750) Improve PostgresCatalog#listTables() by reusing resources
[ https://issues.apache.org/jira/browse/FLINK-29750?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser closed FLINK-29750. -- Fix Version/s: jdbc-3.2.0 Resolution: Fixed Fixed in apache/flink-connector-jdbc:main 7fc202be3dfcdb6510f9855a6943dd97fa2bd3af > Improve PostgresCatalog#listTables() by reusing resources > - > > Key: FLINK-29750 > URL: https://issues.apache.org/jira/browse/FLINK-29750 > Project: Flink > Issue Type: Bug > Components: Connectors / JDBC, Table SQL / Ecosystem >Reporter: Mingliang Liu >Assignee: Mingliang Liu >Priority: Major > Labels: pull-request-available > Fix For: jdbc-3.2.0 > > > Currently the {{PostgresCatalog#listTables()}} creates a new connection and > prepared statement for every schema and table when listing tables. This can > be optimized by reusing those resources. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Reopened] (FLINK-29750) Improve PostgresCatalog#listTables() by reusing resources
[ https://issues.apache.org/jira/browse/FLINK-29750?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser reopened FLINK-29750: Assignee: Mingliang Liu > Improve PostgresCatalog#listTables() by reusing resources > - > > Key: FLINK-29750 > URL: https://issues.apache.org/jira/browse/FLINK-29750 > Project: Flink > Issue Type: Bug > Components: Connectors / JDBC, Table SQL / Ecosystem >Reporter: Mingliang Liu >Assignee: Mingliang Liu >Priority: Major > Labels: pull-request-available > > Currently the {{PostgresCatalog#listTables()}} creates a new connection and > prepared statement for every schema and table when listing tables. This can > be optimized by reusing those resources. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32051) Fix broken documentation links in Flink blogs
[ https://issues.apache.org/jira/browse/FLINK-32051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726556#comment-17726556 ] Weijie Guo commented on FLINK-32051: merged in b64f8efce94018b87a4046c6c4d01e0aed1f5ab5. > Fix broken documentation links in Flink blogs > - > > Key: FLINK-32051 > URL: https://issues.apache.org/jira/browse/FLINK-32051 > Project: Flink > Issue Type: Improvement > Components: Project Website >Reporter: Zhilong Hong >Assignee: Zhilong Hong >Priority: Minor > Labels: pull-request-available > > Currently, the links to the documentations in the blogs are broken. We need > to add a slash ({{{}/{}}}) at the end of the param {{DocsBaseUrl}} in > config.toml like this: > {noformat} > [params] > DocsBaseUrl = "//nightlies.apache.org/flink/" > {noformat} > Also, the links in this > [post|https://flink.apache.org/2022/01/04/how-we-improved-scheduler-performance-for-large-scale-jobs-part-two/] > is not rendered correctly. We need to add a newline after the {{{}{}}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-32051) Fix broken documentation links in Flink blogs
[ https://issues.apache.org/jira/browse/FLINK-32051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo resolved FLINK-32051. Resolution: Fixed > Fix broken documentation links in Flink blogs > - > > Key: FLINK-32051 > URL: https://issues.apache.org/jira/browse/FLINK-32051 > Project: Flink > Issue Type: Improvement > Components: Project Website >Reporter: Zhilong Hong >Assignee: Zhilong Hong >Priority: Minor > Labels: pull-request-available > > Currently, the links to the documentations in the blogs are broken. We need > to add a slash ({{{}/{}}}) at the end of the param {{DocsBaseUrl}} in > config.toml like this: > {noformat} > [params] > DocsBaseUrl = "//nightlies.apache.org/flink/" > {noformat} > Also, the links in this > [post|https://flink.apache.org/2022/01/04/how-we-improved-scheduler-performance-for-large-scale-jobs-part-two/] > is not rendered correctly. We need to add a newline after the {{{}{}}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-connector-jdbc] MartijnVisser commented on pull request #3: [FLINK-15462][connectors] Add Trino dialect
MartijnVisser commented on PR #3: URL: https://github.com/apache/flink-connector-jdbc/pull/3#issuecomment-1564085808 > we’d appreciate it very much if you could please ensure that Flink JDBC connector Trino dialect support Kerberos authentication like Trino Python client, I think that should be a follow-up, and not in the initial PR. Is it also something that you could contribute to? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-32059) Migrate subclasses of BatchAbstractTestBase in batch.sql.agg and batch.sql.join to JUnit5
[ https://issues.apache.org/jira/browse/FLINK-32059?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-32059: --- Labels: pull-request-available (was: ) > Migrate subclasses of BatchAbstractTestBase in batch.sql.agg and > batch.sql.join to JUnit5 > - > > Key: FLINK-32059 > URL: https://issues.apache.org/jira/browse/FLINK-32059 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.18.0 >Reporter: Yuxin Tan >Assignee: Wencong Liu >Priority: Major > Labels: pull-request-available > > Migrate subclasses of BatchAbstractTestBase in batch.sql.agg and > batch.sql.join to JUnit5. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] reswqa commented on a diff in pull request #22579: [FLINK-32059] Migrate subclasses of BatchAbstractTestBase in batch.sql.agg and batch.sql.join to JUnit5
reswqa commented on code in PR #22579: URL: https://github.com/apache/flink/pull/22579#discussion_r1206475729 ## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/OuterJoinITCase.scala: ## @@ -398,7 +398,7 @@ class OuterJoinITCase(expectedJoinType: JoinType) extends BatchTestBase { } object OuterJoinITCase { - @Parameterized.Parameters(name = "{0}") + @Parameters(name = "{0}") Review Comment: ```suggestion @Parameters(name = "expectedJoinType={0}") ``` ## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/SemiJoinITCase.scala: ## @@ -470,7 +469,7 @@ class SemiJoinITCase(expectedJoinType: JoinType) extends BatchTestBase { } object SemiJoinITCase { - @Parameterized.Parameters(name = "{0}-{1}") + @Parameters(name = "{0}-{1}") Review Comment: It feels a bit strange, isn't this test class only one parameter? ## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/LookupJoinITCase.scala: ## @@ -26,23 +26,31 @@ import org.apache.flink.table.planner.factories.TestValuesTableFactory import org.apache.flink.table.planner.runtime.utils.{BatchTestBase, InMemoryLookupableTableSource} import org.apache.flink.table.runtime.functions.table.fullcache.inputformat.FullCacheTestInputFormat import org.apache.flink.table.runtime.functions.table.lookup.LookupCacheManager +import org.apache.flink.testutils.junit.extensions.parameterized.{Parameter, ParameterizedTestExtension, Parameters} import org.apache.flink.types.Row import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.api.Assumptions.assumeThat import org.assertj.core.api.IterableAssert.assertThatIterable -import org.junit.{After, Assume, Before, Test} -import org.junit.Assert.assertEquals -import org.junit.runner.RunWith -import org.junit.runners.Parameterized +import org.junit.jupiter.api.{AfterEach, BeforeEach, TestTemplate} +import org.junit.jupiter.api.Assertions.assertEquals Review Comment: Junit5 assertions are also discouraged. ## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/JoinITCase.scala: ## @@ -1333,7 +1335,7 @@ class JoinITCase(expectedJoinType: JoinType) extends BatchTestBase { } object JoinITCase { - @Parameterized.Parameters(name = "{0}") + @Parameters(name = "{0}") Review Comment: ```suggestion @Parameters(name = "expectedJoinType = {0}") ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-32204) ZooKeeperLeaderElectionTest.testZooKeeperReelectionWithReplacement fails with The ExecutorService is shut down already. No Callables can be executed on AZP
[ https://issues.apache.org/jira/browse/FLINK-32204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726545#comment-17726545 ] Matthias Pohl commented on FLINK-32204: --- FYI: CI failed on [a4de8945|https://github.com/apache/flink/commit/a4de8945]. That version didn't include FLINK-31776, yet. > ZooKeeperLeaderElectionTest.testZooKeeperReelectionWithReplacement fails with > The ExecutorService is shut down already. No Callables can be executed on AZP > --- > > Key: FLINK-32204 > URL: https://issues.apache.org/jira/browse/FLINK-32204 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.18.0 >Reporter: Sergey Nuyanzin >Assignee: Matthias Pohl >Priority: Critical > Labels: test-stability > > [This > build|https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49386=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=7095] > fails as > {noformat} > May 25 18:45:50 Caused by: java.util.concurrent.RejectedExecutionException: > The ExecutorService is shut down already. No Callables can be executed. > May 25 18:45:50 at > org.apache.flink.util.concurrent.DirectExecutorService.throwRejectedExecutionExceptionIfShutdown(DirectExecutorService.java:237) > May 25 18:45:50 at > org.apache.flink.util.concurrent.DirectExecutorService.submit(DirectExecutorService.java:100) > May 25 18:45:50 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.TreeCache.publishEvent(TreeCache.java:902) > May 25 18:45:50 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.TreeCache.publishEvent(TreeCache.java:894) > May 25 18:45:50 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.TreeCache.access$1200(TreeCache.java:79) > May 25 18:45:50 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.TreeCache$TreeNode.processResult(TreeCache.java:489) > May 25 18:45:50 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.CuratorFrameworkImpl.sendToBackgroundCallback(CuratorFrameworkImpl.java:926) > May 25 18:45:50 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.CuratorFrameworkImpl.processBackgroundOperation(CuratorFrameworkImpl.java:683) > May 25 18:45:50 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.WatcherRemovalFacade.processBackgroundOperation(WatcherRemovalFacade.java:152) > May 25 18:45:50 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.GetDataBuilderImpl$3.processResult(GetDataBuilderImpl.java:272) > May 25 18:45:50 at > org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:634) > May 25 18:45:50 at > org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:553) > May 25 18:45:50 > {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-32204) ZooKeeperLeaderElectionTest.testZooKeeperReelectionWithReplacement fails with The ExecutorService is shut down already. No Callables can be executed on AZP
[ https://issues.apache.org/jira/browse/FLINK-32204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726540#comment-17726540 ] Matthias Pohl edited comment on FLINK-32204 at 5/26/23 9:14 AM: Thanks for reporting the issue. I'm going to have a look. -But I suspect it not being related to the FLINK-26522 changes because the test still relies on the legacy {{LeaderElectionDriver}} implementation of ZooKeeper.- This specific test utilizes the {{DefaultLeaderElectionService}} which was touched in FLINK-31838 and FLINK-31773 was (Author: mapohl): Thanks for reporting the issue. I'm going to have a look. But I suspect it not being related to the FLINK-26522 changes because the test still relies on the legacy {{LeaderElectionDriver}} implementation of ZooKeeper. > ZooKeeperLeaderElectionTest.testZooKeeperReelectionWithReplacement fails with > The ExecutorService is shut down already. No Callables can be executed on AZP > --- > > Key: FLINK-32204 > URL: https://issues.apache.org/jira/browse/FLINK-32204 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.18.0 >Reporter: Sergey Nuyanzin >Assignee: Matthias Pohl >Priority: Critical > Labels: test-stability > > [This > build|https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49386=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=7095] > fails as > {noformat} > May 25 18:45:50 Caused by: java.util.concurrent.RejectedExecutionException: > The ExecutorService is shut down already. No Callables can be executed. > May 25 18:45:50 at > org.apache.flink.util.concurrent.DirectExecutorService.throwRejectedExecutionExceptionIfShutdown(DirectExecutorService.java:237) > May 25 18:45:50 at > org.apache.flink.util.concurrent.DirectExecutorService.submit(DirectExecutorService.java:100) > May 25 18:45:50 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.TreeCache.publishEvent(TreeCache.java:902) > May 25 18:45:50 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.TreeCache.publishEvent(TreeCache.java:894) > May 25 18:45:50 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.TreeCache.access$1200(TreeCache.java:79) > May 25 18:45:50 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.TreeCache$TreeNode.processResult(TreeCache.java:489) > May 25 18:45:50 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.CuratorFrameworkImpl.sendToBackgroundCallback(CuratorFrameworkImpl.java:926) > May 25 18:45:50 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.CuratorFrameworkImpl.processBackgroundOperation(CuratorFrameworkImpl.java:683) > May 25 18:45:50 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.WatcherRemovalFacade.processBackgroundOperation(WatcherRemovalFacade.java:152) > May 25 18:45:50 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.GetDataBuilderImpl$3.processResult(GetDataBuilderImpl.java:272) > May 25 18:45:50 at > org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:634) > May 25 18:45:50 at > org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:553) > May 25 18:45:50 > {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)