[jira] [Commented] (FLINK-30629) ClientHeartbeatTest.testJobRunningIfClientReportHeartbeat is unstable

2023-05-26 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726773#comment-17726773
 ] 

Sergey Nuyanzin commented on FLINK-30629:
-

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49421=logs=77a9d8e1-d610-59b3-fc2a-4766541e0e33=125e07e7-8de0-5c6c-a541-a567415af3ef=9992

> ClientHeartbeatTest.testJobRunningIfClientReportHeartbeat is unstable
> -
>
> Key: FLINK-30629
> URL: https://issues.apache.org/jira/browse/FLINK-30629
> Project: Flink
>  Issue Type: Bug
>  Components: Client / Job Submission
>Affects Versions: 1.17.0, 1.18.0
>Reporter: Xintong Song
>Assignee: Weijie Guo
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.17.0
>
> Attachments: ClientHeartbeatTestLog.txt
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=44690=logs=77a9d8e1-d610-59b3-fc2a-4766541e0e33=125e07e7-8de0-5c6c-a541-a567415af3ef=10819
> {code:java}
> Jan 11 04:32:39 [ERROR] Tests run: 3, Failures: 0, Errors: 1, Skipped: 0, 
> Time elapsed: 21.02 s <<< FAILURE! - in 
> org.apache.flink.client.ClientHeartbeatTest
> Jan 11 04:32:39 [ERROR] 
> org.apache.flink.client.ClientHeartbeatTest.testJobRunningIfClientReportHeartbeat
>   Time elapsed: 9.157 s  <<< ERROR!
> Jan 11 04:32:39 java.lang.IllegalStateException: MiniCluster is not yet 
> running or has already been shut down.
> Jan 11 04:32:39   at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
> Jan 11 04:32:39   at 
> org.apache.flink.runtime.minicluster.MiniCluster.getDispatcherGatewayFuture(MiniCluster.java:1044)
> Jan 11 04:32:39   at 
> org.apache.flink.runtime.minicluster.MiniCluster.runDispatcherCommand(MiniCluster.java:917)
> Jan 11 04:32:39   at 
> org.apache.flink.runtime.minicluster.MiniCluster.getJobStatus(MiniCluster.java:841)
> Jan 11 04:32:39   at 
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobStatus(MiniClusterJobClient.java:91)
> Jan 11 04:32:39   at 
> org.apache.flink.client.ClientHeartbeatTest.testJobRunningIfClientReportHeartbeat(ClientHeartbeatTest.java:79)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32176) [CVE-2022-1471] Mitigate CVE from snakeyaml coming from pulsar-client-all

2023-05-26 Thread Samrat Deb (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32176?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Samrat Deb updated FLINK-32176:
---
Attachment: (was: Screenshot 2023-05-27 at 9.00.53 AM.png)

> [CVE-2022-1471] Mitigate CVE from snakeyaml coming from pulsar-client-all
> -
>
> Key: FLINK-32176
> URL: https://issues.apache.org/jira/browse/FLINK-32176
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: 1.17.0
>Reporter: Samrat Deb
>Assignee: Samrat Deb
>Priority: Major
>  Labels: pull-request-available
>
>  
>  * *CVE ID:* {{CVE-2022-1471}}
>  * *CWE:* CWE-502 Deserialization of Untrusted Data
>  * {*}Severity{*}: Critical
> {{pulsar-client-all-2.10.0.jar (shaded: org.yaml:snakeyaml:1.30)}}
>  
> {{SnakeYaml's Constructor() class does not restrict types which can be 
> instantiated during deserialization. Deserializing yaml content provided by 
> an attacker can lead to remote code execution. We recommend using SnakeYaml's 
> SafeConsturctor when parsing untrusted content to restrict deserialization.}}
> {{}}
> {{More details : https://nvd.nist.gov/vuln/detail/CVE-2022-1471}}
> {{{}{}}}{{{}{}}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-32176) [CVE-2022-1471] Mitigate CVE from snakeyaml coming from pulsar-client-all

2023-05-26 Thread Samrat Deb (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726765#comment-17726765
 ] 

Samrat Deb edited comment on FLINK-32176 at 5/27/23 3:31 AM:
-

Please help triggering the workflow for the PR to complete the tests which 
needs approval from maintainers/commiter.


was (Author: samrat007):
Please help triggering the workflow for the PR to complete which needs approval 
from maintainers/commiter.

> [CVE-2022-1471] Mitigate CVE from snakeyaml coming from pulsar-client-all
> -
>
> Key: FLINK-32176
> URL: https://issues.apache.org/jira/browse/FLINK-32176
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: 1.17.0
>Reporter: Samrat Deb
>Assignee: Samrat Deb
>Priority: Major
>  Labels: pull-request-available
>
>  
>  * *CVE ID:* {{CVE-2022-1471}}
>  * *CWE:* CWE-502 Deserialization of Untrusted Data
>  * {*}Severity{*}: Critical
> {{pulsar-client-all-2.10.0.jar (shaded: org.yaml:snakeyaml:1.30)}}
>  
> {{SnakeYaml's Constructor() class does not restrict types which can be 
> instantiated during deserialization. Deserializing yaml content provided by 
> an attacker can lead to remote code execution. We recommend using SnakeYaml's 
> SafeConsturctor when parsing untrusted content to restrict deserialization.}}
> {{}}
> {{More details : https://nvd.nist.gov/vuln/detail/CVE-2022-1471}}
> {{{}{}}}{{{}{}}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-32176) [CVE-2022-1471] Mitigate CVE from snakeyaml coming from pulsar-client-all

2023-05-26 Thread Samrat Deb (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726765#comment-17726765
 ] 

Samrat Deb edited comment on FLINK-32176 at 5/27/23 3:31 AM:
-

Please help triggering the workflow for the PR to complete which needs approval 
from maintainers/commiter.


was (Author: samrat007):
Please help triggering the workflow for the PR to complete which needs approval 
from maintainers/commiter.

> [CVE-2022-1471] Mitigate CVE from snakeyaml coming from pulsar-client-all
> -
>
> Key: FLINK-32176
> URL: https://issues.apache.org/jira/browse/FLINK-32176
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: 1.17.0
>Reporter: Samrat Deb
>Assignee: Samrat Deb
>Priority: Major
>  Labels: pull-request-available
>
>  
>  * *CVE ID:* {{CVE-2022-1471}}
>  * *CWE:* CWE-502 Deserialization of Untrusted Data
>  * {*}Severity{*}: Critical
> {{pulsar-client-all-2.10.0.jar (shaded: org.yaml:snakeyaml:1.30)}}
>  
> {{SnakeYaml's Constructor() class does not restrict types which can be 
> instantiated during deserialization. Deserializing yaml content provided by 
> an attacker can lead to remote code execution. We recommend using SnakeYaml's 
> SafeConsturctor when parsing untrusted content to restrict deserialization.}}
> {{}}
> {{More details : https://nvd.nist.gov/vuln/detail/CVE-2022-1471}}
> {{{}{}}}{{{}{}}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32176) [CVE-2022-1471] Mitigate CVE from snakeyaml coming from pulsar-client-all

2023-05-26 Thread Samrat Deb (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726765#comment-17726765
 ] 

Samrat Deb commented on FLINK-32176:


Please help triggering the workflow for the PR to complete which needs approval 
from maintainers/commiter.

> [CVE-2022-1471] Mitigate CVE from snakeyaml coming from pulsar-client-all
> -
>
> Key: FLINK-32176
> URL: https://issues.apache.org/jira/browse/FLINK-32176
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: 1.17.0
>Reporter: Samrat Deb
>Assignee: Samrat Deb
>Priority: Major
>  Labels: pull-request-available
>
>  
>  * *CVE ID:* {{CVE-2022-1471}}
>  * *CWE:* CWE-502 Deserialization of Untrusted Data
>  * {*}Severity{*}: Critical
> {{pulsar-client-all-2.10.0.jar (shaded: org.yaml:snakeyaml:1.30)}}
>  
> {{SnakeYaml's Constructor() class does not restrict types which can be 
> instantiated during deserialization. Deserializing yaml content provided by 
> an attacker can lead to remote code execution. We recommend using SnakeYaml's 
> SafeConsturctor when parsing untrusted content to restrict deserialization.}}
> {{}}
> {{More details : https://nvd.nist.gov/vuln/detail/CVE-2022-1471}}
> {{{}{}}}{{{}{}}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32137) Flame graph is hard to use with many task managers

2023-05-26 Thread Vladimir Matveev (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32137?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726760#comment-17726760
 ] 

Vladimir Matveev commented on FLINK-32137:
--

Hi [~fanrui], I need to clear this up with my management/open source people, 
but it is likely I will be able to contribute a fix here. I'll try to ping you 
here soon.

> Flame graph is hard to use with many task managers
> --
>
> Key: FLINK-32137
> URL: https://issues.apache.org/jira/browse/FLINK-32137
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Affects Versions: 1.16.1
>Reporter: Vladimir Matveev
>Priority: Major
> Attachments: image (1).png, image-2023-05-23-11-01-30-391.png
>
>
> In case there are many task managers executing the same operator, the flame 
> graph becomes very hard to use. As you can see on the attached picture, it 
> considers instances of the same lambda function as different classes, and 
> their number seems to be equal to the number of task managers (i.e. each JVM 
> gets its own "class" name, which is expected for lambdas I guess). This 
> lambda function is deep within Flink's own call stack, so this kind of graph 
> is inevitable regardless of the job's own logic, and there is nothing we can 
> do at the job logic's level to fix it.
> This behavior makes evaluating the flame graph very hard, because all of the 
> useful information gets "compressed" inside each "column" of the graph, and 
> at the same time, it does not give any useful information since this is just 
> an artifact of the class name generation in the JVM.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] flinkbot commented on pull request #22668: [BP-1.17][FLINK-28853] Document the split level watermark alignment feature an…

2023-05-26 Thread via GitHub


flinkbot commented on PR #22668:
URL: https://github.com/apache/flink/pull/22668#issuecomment-1564848214

   
   ## CI report:
   
   * 60a2646b4caec19e5a3a470f5d7528009d880246 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] mas-chen opened a new pull request, #22668: [BP-1.17][FLINK-28853] Document the split level watermark alignment feature an…

2023-05-26 Thread via GitHub


mas-chen opened a new pull request, #22668:
URL: https://github.com/apache/flink/pull/22668

   …d fixup grammar from the configuration table
   
   ## What is the purpose of the change
   
   Update docs for watermark alignment.
   
   
   ## Brief change log
   
   - Update docs for watermark alignment
   - Fixup grammar in Flink configuration 
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (**yes** / no)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't 
know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (**yes** / no)
 - If yes, how is the feature documented? (not applicable / **docs** / 
JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32206) ModuleNotFoundError for Pyflink.table.descriptors wheel version mismatch

2023-05-26 Thread Alireza Omidvar (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alireza Omidvar updated FLINK-32206:

Attachment: (was: image (1)-1.png)

> ModuleNotFoundError for Pyflink.table.descriptors wheel version mismatch
> 
>
> Key: FLINK-32206
> URL: https://issues.apache.org/jira/browse/FLINK-32206
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python, Connectors / Kafka, Connectors / MongoDB
>Affects Versions: 1.17.0
>Reporter: Alireza Omidvar
>Priority: Major
> Attachments: image (1).png, image (2).png
>
>
> Gentlemen,
> I have problem with some apache-flink modules. I am running a 1.17.0 apache- 
> flink and I write test codes in Colab I faced a problem on importing Kafka, 
> Json and FileSystem  modules 
>  
> from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime 
> from pyflink.table.catalog import FileSystem 
>  
> not working for me (python version 3.10) 
>  
> Any help is highly appreciated the strange is that other modules importing 
> fine.  I checked with your Github but didn't find these on official version 
> too which means modules are not inside the descriptor.py in newer version. 
>  
> Please see the link below: 
>  
> [https://github.com/aomidvar/scrapper-price-comparison/blob/d8a10f74101bf96974e769813c33b83d7a71f02b/kafkaconsumer1.ipynb]
>  
>  
> I am running a test after producing the stream 
> ([https://github.com/aomidvar/scrapper-price-comparison/blob/main/kafkaproducer1.ipynb])
>  to Confluent server and I like to do a flink job but the above mentioned 
> modules are not found with the following links in collab:
>  
> That is probably an easy fix bug. Only version of apache-flink now working on 
> colab is 1.17.0. I prefer 3.10 but installed a virtual python 3.8 env and 
> between different modules found out that Kafka and Json modules are not in 
> descriptors.py of version 1.17 Apache-flink default. But modules exist in 
> Apache-flink 1.13 version.
> [https://colab.research.google.com/drive/1aHKv8WA6RA10zTdwdzUubB5K0anEmOws?usp=sharing]
> [https://colab.research.google.com/drive/1eCHJlsb8AjdmJtPc95X3H4btmFVSoCL4?usp=sharing]
>  
> I've got this error for Json, Kafka ...
> ---
>  
>  ImportError Traceback (most recent call last)  
> in () 1 from pyflink.table import DataTypes > 2 from 
> pyflink.table.descriptors import Schema, Kafka, Json, Rowtime 3 from 
> pyflink.table.catalog import FileSystem ImportError: cannot import name 
> 'Kafka' from 'pyflink.table.descriptors' 
> (/usr/local/lib/python3.10/dist-packages/pyflink/table/descriptors.py) 
>  
> ---
>  
>  NOTE: If your import is failing due to a missing package, you can manually 
> install dependencies using either !pip or !apt. To view examples of 
> installing some common dependencies, click the "Open Examples" button below.
>  
>  ---
>  
> I have doubt that if current error is related to a version and dependencies 
> then 
>  
> I have to ask the developer if I do this python 3.8 env is that possible to 
> get solved?
>  
>  
> Thanks for your time ,
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32206) ModuleNotFoundError for Pyflink.table.descriptors wheel version mismatch

2023-05-26 Thread Alireza Omidvar (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alireza Omidvar updated FLINK-32206:

Attachment: (was: image (2)-1.png)

> ModuleNotFoundError for Pyflink.table.descriptors wheel version mismatch
> 
>
> Key: FLINK-32206
> URL: https://issues.apache.org/jira/browse/FLINK-32206
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python, Connectors / Kafka, Connectors / MongoDB
>Affects Versions: 1.17.0
>Reporter: Alireza Omidvar
>Priority: Major
> Attachments: image (1).png, image (2).png
>
>
> Gentlemen,
> I have problem with some apache-flink modules. I am running a 1.17.0 apache- 
> flink and I write test codes in Colab I faced a problem on importing Kafka, 
> Json and FileSystem  modules 
>  
> from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime 
> from pyflink.table.catalog import FileSystem 
>  
> not working for me (python version 3.10) 
>  
> Any help is highly appreciated the strange is that other modules importing 
> fine.  I checked with your Github but didn't find these on official version 
> too which means modules are not inside the descriptor.py in newer version. 
>  
> Please see the link below: 
>  
> [https://github.com/aomidvar/scrapper-price-comparison/blob/d8a10f74101bf96974e769813c33b83d7a71f02b/kafkaconsumer1.ipynb]
>  
>  
> I am running a test after producing the stream 
> ([https://github.com/aomidvar/scrapper-price-comparison/blob/main/kafkaproducer1.ipynb])
>  to Confluent server and I like to do a flink job but the above mentioned 
> modules are not found with the following links in collab:
>  
> That is probably an easy fix bug. Only version of apache-flink now working on 
> colab is 1.17.0. I prefer 3.10 but installed a virtual python 3.8 env and 
> between different modules found out that Kafka and Json modules are not in 
> descriptors.py of version 1.17 Apache-flink default. But modules exist in 
> Apache-flink 1.13 version.
> [https://colab.research.google.com/drive/1aHKv8WA6RA10zTdwdzUubB5K0anEmOws?usp=sharing]
> [https://colab.research.google.com/drive/1eCHJlsb8AjdmJtPc95X3H4btmFVSoCL4?usp=sharing]
>  
> I've got this error for Json, Kafka ...
> ---
>  
>  ImportError Traceback (most recent call last)  
> in () 1 from pyflink.table import DataTypes > 2 from 
> pyflink.table.descriptors import Schema, Kafka, Json, Rowtime 3 from 
> pyflink.table.catalog import FileSystem ImportError: cannot import name 
> 'Kafka' from 'pyflink.table.descriptors' 
> (/usr/local/lib/python3.10/dist-packages/pyflink/table/descriptors.py) 
>  
> ---
>  
>  NOTE: If your import is failing due to a missing package, you can manually 
> install dependencies using either !pip or !apt. To view examples of 
> installing some common dependencies, click the "Open Examples" button below.
>  
>  ---
>  
> I have doubt that if current error is related to a version and dependencies 
> then 
>  
> I have to ask the developer if I do this python 3.8 env is that possible to 
> get solved?
>  
>  
> Thanks for your time ,
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32206) ModuleNotFoundError for Pyflink.table.descriptors wheel version mismatch

2023-05-26 Thread Alireza Omidvar (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alireza Omidvar updated FLINK-32206:

Attachment: image (1)-1.png
image (2)-1.png
   Component/s: Connectors / Kafka
Connectors / MongoDB
External issue URL: 
https://github.com/aomidvar/scrapper-price-comparison/blob/d8a10f74101bf96974e769813c33b83d7a71f02b/kafkaconsumer1.ipynb
  Language: Python
 Affects Version/s: 1.17.0
(was: mongodb-1.0.1)
   Description: 
Gentlemen,

I have problem with some apache-flink modules. I am running a 1.17.0 apache- 
flink and I write test codes in Colab I faced a problem on importing Kafka, 
Json and FileSystem  modules 
 
from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime 
from pyflink.table.catalog import FileSystem 
 
not working for me (python version 3.10) 
 
Any help is highly appreciated the strange is that other modules importing 
fine.  I checked with your Github but didn't find these on official version too 
which means modules are not inside the descriptor.py in newer version. 
 
Please see the link below: 
 

[https://github.com/aomidvar/scrapper-price-comparison/blob/d8a10f74101bf96974e769813c33b83d7a71f02b/kafkaconsumer1.ipynb]
 
 
I am running a test after producing the stream 
([https://github.com/aomidvar/scrapper-price-comparison/blob/main/kafkaproducer1.ipynb])
 to Confluent server and I like to do a flink job but the above mentioned 
modules are not found with the following links in collab:
 
That is probably an easy fix bug. Only version of apache-flink now working on 
colab is 1.17.0. I prefer 3.10 but installed a virtual python 3.8 env and 
between different modules found out that Kafka and Json modules are not in 
descriptors.py of version 1.17 Apache-flink default. But modules exist in 
Apache-flink 1.13 version.
[https://colab.research.google.com/drive/1aHKv8WA6RA10zTdwdzUubB5K0anEmOws?usp=sharing]
[https://colab.research.google.com/drive/1eCHJlsb8AjdmJtPc95X3H4btmFVSoCL4?usp=sharing]
 
I've got this error for Json, Kafka ...
---
 
 ImportError Traceback (most recent call last)  
in () 1 from pyflink.table import DataTypes > 2 from 
pyflink.table.descriptors import Schema, Kafka, Json, Rowtime 3 from 
pyflink.table.catalog import FileSystem ImportError: cannot import name 'Kafka' 
from 'pyflink.table.descriptors' 
(/usr/local/lib/python3.10/dist-packages/pyflink/table/descriptors.py) 
 
---
 
 NOTE: If your import is failing due to a missing package, you can manually 
install dependencies using either !pip or !apt. To view examples of installing 
some common dependencies, click the "Open Examples" button below.
 
 ---
 
I have doubt that if current error is related to a version and dependencies 
then 
 
I have to ask the developer if I do this python 3.8 env is that possible to get 
solved?
 
 
Thanks for your time ,
 

  was:
Gentlemen,
 
 
 
I have problem with some apache-flink modules. I am running a 1.17.0 apache- 
flink and I write test codes in Colab I faced a problem for import modules 
 
 
 
 
from pyflink.table import DataTypes 
 
from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime 
 
from pyflink.table.catalog import FileSystem 
 
 
 
 
not working for me (python version 3.10) 
 
 
Any help is highly appreciated the strange is that other modules importing 
fine.  I checked with your Github but didn't find these on yours too which 
means modules are not inside your descriptor.py too. I think it needed 
installation of connectors but it failed too. 
 
 
 
 
Please see the link below: 
 
 
 
 
 
[https://github.com/aomidvar/scrapper-price-comparison/blob/d8a10f74101bf96974e769813c33b83d7a71f02b/kafkaconsumer1.ipynb]
 
 
 
 
I am running a test after producing the stream 
([https://github.com/aomidvar/scrapper-price-comparison/blob/main/kafkaproducer1.ipynb])
 to Confluent server and I like to do a flink job but the above mentioned 
modules are not found with the following links in collab:
 
 
That is not probably a bug. Only version of apache-flink now working on colab 
is 1.17.0. I prefer 3.10 but installed a virtual python 3.8 env and between 
different modules found out that Kafka and Json modules are not in 
descriptors.py of version 1.17 Apache-flink default. But modules exist in 
Apache-flink 1.13 version.
[https://colab.research.google.com/drive/1aHKv8WA6RA10zTdwdzUubB5K0anEmOws?usp=sharing]
[https://colab.research.google.com/drive/1eCHJlsb8AjdmJtPc95X3H4btmFVSoCL4?usp=sharing]
 
I've got this error for Json, Kafka ...
---
 
 

[GitHub] [flink] pgaref commented on pull request #22663: [FLINK-32198][runtime] Enforce single maxExceptions query parameter

2023-05-26 Thread via GitHub


pgaref commented on PR #22663:
URL: https://github.com/apache/flink/pull/22663#issuecomment-1564774191

   > This is an api-breaking change and can't be merged.
   
   Thanks for taking a look @zentol !
   What the policy for making breaking changes?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] kurtostfeld commented on pull request #22660: [FLINK-3154][runtime] Upgrade from Kryo v2 + Chill 0.7.6 to Kryo v5 w…

2023-05-26 Thread via GitHub


kurtostfeld commented on PR #22660:
URL: https://github.com/apache/flink/pull/22660#issuecomment-1564710396

   @zentol ok, I created a Confluence login with username "kurto", however I 
don't seem to have permissions to create a new FLIP. I don't see a "Create" 
button as the docs say:
   
   > To create your own FLIP, click on "Create" on the header and choose 
"FLIP-Template" other than "Blank page".
   
   Thank you :)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-3154) Update Kryo version from 2.24.0 to latest Kryo LTS version

2023-05-26 Thread Kurt Ostfeld (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-3154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726320#comment-17726320
 ] 

Kurt Ostfeld edited comment on FLINK-3154 at 5/26/23 5:24 PM:
--

[~martijnvisser] this PR upgrades Flink from Kryo v2.x to Kryo v5.x and 
preserves backward compatibility with existing savepoints and checkpoints: 
[https://github.com/apache/flink/pull/22660]

 

This keeps the Kryo v2 project dependency for backwards compatibility only and 
otherwise uses Kryo v5.x.

 

EDIT: All CI tests are passing.


was (Author: JIRAUSER38):
[~martijnvisser] this PR upgrades Flink from Kryo v2.x to Kryo v5.x and 
preserves backward compatibility with existing savepoints and checkpoints: 
[https://github.com/apache/flink/pull/22660]

 

This keeps the Kryo v2 project dependency for backwards compatibility only and 
otherwise uses Kryo v5.x.

 

EDIT: I will fix the CI errors.

> Update Kryo version from 2.24.0 to latest Kryo LTS version
> --
>
> Key: FLINK-3154
> URL: https://issues.apache.org/jira/browse/FLINK-3154
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Type Serialization System
>Affects Versions: 1.0.0
>Reporter: Maximilian Michels
>Priority: Not a Priority
>  Labels: pull-request-available
>
> Flink's Kryo version is outdated and could be updated to a newer version, 
> e.g. kryo-3.0.3.
> From ML: we cannot bumping the Kryo version easily - the serialization format 
> changed (that's why they have a new major version), which would render all 
> Flink savepoints and checkpoints incompatible.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-32209) Opensearch connector should remove the dependency on flink-shaded

2023-05-26 Thread Andriy Redko (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32209?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andriy Redko resolved FLINK-32209.
--
Resolution: Won't Do

> Opensearch connector should remove the dependency on flink-shaded
> -
>
> Key: FLINK-32209
> URL: https://issues.apache.org/jira/browse/FLINK-32209
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Opensearch
>Affects Versions: opensearch-1.0.1
>Reporter: Andriy Redko
>Assignee: Andriy Redko
>Priority: Major
> Fix For: opensearch-1.1.0
>
>
> The Opensearch connector depends on flink-shaded. With the externalization of 
> the connector, the connectors shouldn't rely on Flink-Shaded



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32209) Opensearch connector should remove the dependency on flink-shaded

2023-05-26 Thread Andriy Redko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726691#comment-17726691
 ] 

Andriy Redko commented on FLINK-32209:
--

Already fixed by 
https://github.com/apache/flink-connector-opensearch/commit/85e9cad4f09519543e149530f8a61b2635ca506e

> Opensearch connector should remove the dependency on flink-shaded
> -
>
> Key: FLINK-32209
> URL: https://issues.apache.org/jira/browse/FLINK-32209
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Opensearch
>Affects Versions: opensearch-1.0.1
>Reporter: Andriy Redko
>Assignee: Andriy Redko
>Priority: Major
> Fix For: opensearch-1.1.0
>
>
> The Opensearch connector depends on flink-shaded. With the externalization of 
> the connector, the connectors shouldn't rely on Flink-Shaded



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-jdbc] ewangsdc commented on pull request #3: [FLINK-15462][connectors] Add Trino dialect

2023-05-26 Thread via GitHub


ewangsdc commented on PR #3:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/3#issuecomment-1564695132

   If the new Flink JDBC connector Trino dialect supports this Trino JDBC 
driver, i.e., https://trino.io/docs/current/client/jdbc.html & 
https://github.com/trinodb/docs.trino.io, Kerberos auth options could be added 
as extra options in addition to the current ones, i.e., 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/jdbc/#connector-options:~:text=.id%3B-,Connector%20Options,-%23.
   Like Spark/PySpark session JDBC options for Trino Kerberos auth, the extra 
Flink JDBC connection options should include the Kerberos parameters s for the 
above Trino JDBC driver, e.g.,
    
       'kerberos_remote_service_name': "",
       'kerbeors_principal': "",
       'kerberos_usecanonicalhostname': ,
       'driver': 'io.trino.jdbc.TrinoDriver'
    
   Another option is to embed the above Kerberos parameters into the existing 
connection option url, i.e., JDBC database url, as one string, e.g, 
“jdbc:trino:///==False&..",
  and then Flink Trino dialect Java code should parse this url to get the 
Kerberos parameters and pass them to the above Trino JDBC driver .
   
   Thanks,


   
   Emerson


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-32209) Opensearch connector should remove the dependency on flink-shaded

2023-05-26 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32209?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin reassigned FLINK-32209:
---

Assignee: Andriy Redko

> Opensearch connector should remove the dependency on flink-shaded
> -
>
> Key: FLINK-32209
> URL: https://issues.apache.org/jira/browse/FLINK-32209
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Opensearch
>Affects Versions: opensearch-1.0.1
>Reporter: Andriy Redko
>Assignee: Andriy Redko
>Priority: Major
> Fix For: opensearch-1.1.0
>
>
> The Opensearch connector depends on flink-shaded. With the externalization of 
> the connector, the connectors shouldn't rely on Flink-Shaded



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] reswqa commented on pull request #22666: Update japicmp configuration for 1.16.2

2023-05-26 Thread via GitHub


reswqa commented on PR #22666:
URL: https://github.com/apache/flink/pull/22666#issuecomment-1564661314

   I don't know if it was missed or just for reasons I'm not aware of, it seems 
that the release of `1.16.1` did not update `japicmp`. If I understand 
correctly, we also should update it for `bug-fix` release. For safety reasons, 
could you please help confirm this @MartijnVisser. Thanks a lot!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31944) Protobuf format throw com.google.protobuf.InvalidProtocolBufferException

2023-05-26 Thread Ryan Skraba (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726681#comment-17726681
 ] 

Ryan Skraba commented on FLINK-31944:
-

This is a duplicate of FLINK-32008 – one of the serialized messages is:
{code:java}
0a 03 61 62 63 0a {code}
(Note the first **0a** byte that isn't a newline, and the second **0a** byte 
that was added to the record and probably shouldn't be there.)

> Protobuf format throw com.google.protobuf.InvalidProtocolBufferException
> 
>
> Key: FLINK-31944
> URL: https://issues.apache.org/jira/browse/FLINK-31944
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.17.0
>Reporter: Xuannan Su
>Priority: Major
> Attachments: flink-protobuf-example.zip
>
>
> It seems that protobuf format throws the following exception when the first 
> field of the message is string type. This may also occur for other types. I 
> uploaded the maven project to reproduce the problem.
>  
> {code:java}
> Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received 
> unexpected exception while polling the records
>     at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
>     at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:114)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     ... 1 more
> Caused by: java.io.IOException: Failed to deserialize PB object.
>     at 
> org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.deserialize(PbRowDataDeserializationSchema.java:75)
>     at 
> org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.deserialize(PbRowDataDeserializationSchema.java:42)
>     at 
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
>     at 
> org.apache.flink.connector.file.table.DeserializationSchemaAdapter$LineBytesInputFormat.readRecord(DeserializationSchemaAdapter.java:197)
>     at 
> org.apache.flink.connector.file.table.DeserializationSchemaAdapter$LineBytesInputFormat.nextRecord(DeserializationSchemaAdapter.java:210)
>     at 
> org.apache.flink.connector.file.table.DeserializationSchemaAdapter$Reader.readBatch(DeserializationSchemaAdapter.java:124)
>     at 
> org.apache.flink.connector.file.src.util.RecordMapperWrapperRecordIterator$1.readBatch(RecordMapperWrapperRecordIterator.java:82)
>     at 
> org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:67)
>     at 
> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
>     at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:162)
>     ... 6 more
> Caused by: java.lang.reflect.InvocationTargetException
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at 
> org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter.convertProtoBinaryToRow(ProtoToRowConverter.java:129)
>     at 
> org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.deserialize(PbRowDataDeserializationSchema.java:70)
>     ... 15 more
> Caused by: com.google.protobuf.InvalidProtocolBufferException: Protocol 
> message contained an invalid tag (zero).
>     at 
> com.google.protobuf.InvalidProtocolBufferException.invalidTag(InvalidProtocolBufferException.java:133)
>     at 
> com.google.protobuf.CodedInputStream$ArrayDecoder.readTag(CodedInputStream.java:633)
>     at com.example.proto.Message.(Message.java:47)
>     at com.example.proto.Message.(Message.java:9)
>     at com.example.proto.Message$1.parsePartialFrom(Message.java:540)
>     at com.example.proto.Message$1.parsePartialFrom(Message.java:534)
>     at 
> com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:158)
>     at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:191)
>     at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:203)
>     at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:208)
>     at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:48)
>     at 

[jira] [Comment Edited] (FLINK-32008) Protobuf format cannot work with FileSystem Connector

2023-05-26 Thread Ryan Skraba (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726567#comment-17726567
 ] 

Ryan Skraba edited comment on FLINK-32008 at 5/26/23 4:40 PM:
--

It's probably worth checking in with the community to see what we expect of a 
bulk format, or if it would be an interesting thing to add!  [I'll ask on the 
mailing list|https://lists.apache.org/thread/z9tqdqrhj12c17wqsdbm5fhzonqq5kp0].

I took a quick look but didn't see any related JIRA (outside of FLINK-12149, 
which proposes using the protobuf API to interact with parquet files).  Can a 
committer change the title of this JIRA to better reflect the issue?  Something 
like "Protobuf format on filesystem is faulty"


was (Author: ryanskraba):
It's probably worth checking in with the community to see what we expect of a 
bulk format, or if it would be an interesting thing to add!  I'll ask on the 
mailing list.

I took a quick look but didn't see any related JIRA (outside of FLINK-12149, 
which proposes using the protobuf API to interact with parquet files).  Can a 
committer change the title of this JIRA to better reflect the issue?  Something 
like "Protobuf format on filesystem is faulty"

> Protobuf format cannot work with FileSystem Connector
> -
>
> Key: FLINK-32008
> URL: https://issues.apache.org/jira/browse/FLINK-32008
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.17.0
>Reporter: Xuannan Su
>Priority: Major
> Attachments: flink-protobuf-example.zip
>
>
> The protobuf format throws exception when working with Map data type. I 
> uploaded a example project to reproduce the problem.
>  
> {code:java}
> Caused by: java.lang.RuntimeException: One or more fetchers have encountered 
> exception
>     at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:261)
>     at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
>     at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:131)
>     at 
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:417)
>     at 
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
>     at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received 
> unexpected exception while polling the records
>     at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
>     at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:114)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     ... 1 more
> Caused by: java.io.IOException: Failed to deserialize PB object.
>     at 
> org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.deserialize(PbRowDataDeserializationSchema.java:75)
>     at 
> org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.deserialize(PbRowDataDeserializationSchema.java:42)
>     at 
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
>     at 
> org.apache.flink.connector.file.table.DeserializationSchemaAdapter$LineBytesInputFormat.readRecord(DeserializationSchemaAdapter.java:197)
>     at 
> org.apache.flink.connector.file.table.DeserializationSchemaAdapter$LineBytesInputFormat.nextRecord(DeserializationSchemaAdapter.java:210)
>     at 
> 

[jira] [Created] (FLINK-32209) Opensearch connector should remove the dependency on flink-shaded

2023-05-26 Thread Andriy Redko (Jira)
Andriy Redko created FLINK-32209:


 Summary: Opensearch connector should remove the dependency on 
flink-shaded
 Key: FLINK-32209
 URL: https://issues.apache.org/jira/browse/FLINK-32209
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Opensearch
Affects Versions: opensearch-1.0.1
Reporter: Andriy Redko
 Fix For: opensearch-1.1.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32209) Opensearch connector should remove the dependency on flink-shaded

2023-05-26 Thread Andriy Redko (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32209?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andriy Redko updated FLINK-32209:
-
Description: The Opensearch connector depends on flink-shaded. With the 
externalization of the connector, the connectors shouldn't rely on Flink-Shaded

> Opensearch connector should remove the dependency on flink-shaded
> -
>
> Key: FLINK-32209
> URL: https://issues.apache.org/jira/browse/FLINK-32209
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Opensearch
>Affects Versions: opensearch-1.0.1
>Reporter: Andriy Redko
>Priority: Major
> Fix For: opensearch-1.1.0
>
>
> The Opensearch connector depends on flink-shaded. With the externalization of 
> the connector, the connectors shouldn't rely on Flink-Shaded



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32200) OrcFileSystemITCase cashed with exit code 239 (NoClassDefFoundError: scala/concurrent/duration/Deadline)

2023-05-26 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32200?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin updated FLINK-32200:

Labels: test-stability  (was: )

> OrcFileSystemITCase cashed with exit code 239 (NoClassDefFoundError: 
> scala/concurrent/duration/Deadline)
> 
>
> Key: FLINK-32200
> URL: https://issues.apache.org/jira/browse/FLINK-32200
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.18.0
>Reporter: Matthias Pohl
>Priority: Critical
>  Labels: test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49325=logs=4eda0b4a-bd0d-521a-0916-8285b9be9bb5=2ff6d5fa-53a6-53ac-bff7-fa524ea361a9=12302
> {code}
> 12:24:14,883 [flink-akka.actor.internal-dispatcher-2] ERROR 
> org.apache.flink.util.FatalExitExceptionHandler  [] - FATAL: 
> Thread 'flink-akka.actor.internal-dispatcher-2' produced an uncaught 
> exception. Stopping the process...
> java.lang.NoClassDefFoundError: scala/concurrent/duration/Deadline
> at scala.concurrent.duration.Deadline$.apply(Deadline.scala:30) 
> ~[flink-rpc-akka_318674dc-e98c-4e16-8705-faefda52bd1a.jar:1.18-SNAPSHOT]
> at scala.concurrent.duration.Deadline$.now(Deadline.scala:76) 
> ~[flink-rpc-akka_318674dc-e98c-4e16-8705-faefda52bd1a.jar:1.18-SNAPSHOT]
> at 
> akka.actor.CoordinatedShutdown.loop$1(CoordinatedShutdown.scala:737) 
> ~[flink-rpc-akka_318674dc-e98c-4e16-8705-faefda52bd1a.jar:1.18-SNAPSHOT]
> at 
> akka.actor.CoordinatedShutdown.$anonfun$run$7(CoordinatedShutdown.scala:762) 
> ~[flink-rpc-akka_318674dc-e98c-4e16-8705-faefda52bd1a.jar:1.18-SNAPSHOT]
> at scala.concurrent.Future.$anonfun$flatMap$1(Future.scala:307) 
> ~[flink-rpc-akka_318674dc-e98c-4e16-8705-faefda52bd1a.jar:1.18-SNAPSHOT]
> at 
> scala.concurrent.impl.Promise.$anonfun$transformWith$1(Promise.scala:41) 
> ~[flink-rpc-akka_318674dc-e98c-4e16-8705-faefda52bd1a.jar:1.18-SNAPSHOT]
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) 
> ~[flink-rpc-akka_318674dc-e98c-4e16-8705-faefda52bd1a.jar:1.18-SNAPSHOT]
> at 
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)
>  ~[flink-rpc-akka_318674dc-e98c-4e16-8705-faefda52bd1a.jar:1.18-SNAPSHOT]
> at 
> akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)
>  ~[flink-rpc-akka_318674dc-e98c-4e16-8705-faefda52bd1a.jar:1.18-SNAPSHOT]
> at 
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) 
> ~[flink-rpc-akka_318674dc-e98c-4e16-8705-faefda52bd1a.jar:1.18-SNAPSHOT]
> at 
> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85) 
> ~[flink-rpc-akka_318674dc-e98c-4e16-8705-faefda52bd1a.jar:1.18-SNAPSHOT]
> at 
> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100) 
> ~[flink-rpc-akka_318674dc-e98c-4e16-8705-faefda52bd1a.jar:1.18-SNAPSHOT]
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49) 
> ~[flink-rpc-akka_318674dc-e98c-4e16-8705-faefda52bd1a.jar:1.18-SNAPSHOT]
> at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
>  [flink-rpc-akka_318674dc-e98c-4e16-8705-faefda52bd1a.jar:1.18-SNAPSHOT]
> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) 
> [?:1.8.0_292]
> at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) 
> [?:1.8.0_292]
> at 
> java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) 
> [?:1.8.0_292]
> at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) 
> [?:1.8.0_292]
> 12:24:14,882 [flink-metrics-akka.actor.internal-dispatcher-2] ERROR 
> org.apache.flink.util.FatalExitExceptionHandler  [] - FATAL: 
> Thread 'flink-metrics-akka.actor.internal-dispatcher-2' produced an uncaught 
> exception. Stopping the process...
> java.lang.NoClassDefFoundError: scala/concurrent/duration/Deadline
> at scala.concurrent.duration.Deadline$.apply(Deadline.scala:30) 
> ~[flink-rpc-akka_318674dc-e98c-4e16-8705-faefda52bd1a.jar:1.18-SNAPSHOT]
> at scala.concurrent.duration.Deadline$.now(Deadline.scala:76) 
> ~[flink-rpc-akka_318674dc-e98c-4e16-8705-faefda52bd1a.jar:1.18-SNAPSHOT]
> at 
> akka.actor.CoordinatedShutdown.loop$1(CoordinatedShutdown.scala:737) 
> ~[flink-rpc-akka_318674dc-e98c-4e16-8705-faefda52bd1a.jar:1.18-SNAPSHOT]
> at 
> akka.actor.CoordinatedShutdown.$anonfun$run$7(CoordinatedShutdown.scala:762) 
> ~[flink-rpc-akka_318674dc-e98c-4e16-8705-faefda52bd1a.jar:1.18-SNAPSHOT]
> at 

[GitHub] [flink-connector-aws] snuyanzin commented on pull request #75: [FLINK-32208] Remove dependency on flink-shaded from flink-connector-aws

2023-05-26 Thread via GitHub


snuyanzin commented on PR #75:
URL: 
https://github.com/apache/flink-connector-aws/pull/75#issuecomment-1564624709

   hm... that's weird...
   almost same command passes locally
   ```
   mvn clean install -DskipTests -Dflink.convergence.phase=install 
-Pcheck-convergence -U -B \
   -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false 
-Dmaven.wagon.httpconnectionManager.ttlSeconds=120 \
   
-DaltDeploymentRepository=validation_repository::default::file:/tmp/flink-validation-deployment
 \
   -Dflink.version=1.16.1
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] WencongLiu commented on a diff in pull request #22579: [FLINK-32059] Migrate subclasses of BatchAbstractTestBase in batch.sql.agg and batch.sql.join to JUnit5

2023-05-26 Thread via GitHub


WencongLiu commented on code in PR #22579:
URL: https://github.com/apache/flink/pull/22579#discussion_r1206929777


##
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/JoinITCase.scala:
##
@@ -1333,7 +1335,7 @@ class JoinITCase(expectedJoinType: JoinType) extends 
BatchTestBase {
 }
 
 object JoinITCase {
-  @Parameterized.Parameters(name = "{0}")
+  @Parameters(name = "{0}")

Review Comment:
   Sorry for the misunderstanding. Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32206) Error ModuleNotFoundError for Pyflink.table.descriptors and wheel version mismatch

2023-05-26 Thread Alireza Omidvar (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alireza Omidvar updated FLINK-32206:

Attachment: image (2).png
image (1).png

> Error ModuleNotFoundError for Pyflink.table.descriptors and wheel version 
> mismatch
> --
>
> Key: FLINK-32206
> URL: https://issues.apache.org/jira/browse/FLINK-32206
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: mongodb-1.0.1
>Reporter: Alireza Omidvar
>Priority: Major
> Attachments: image (1).png, image (2).png
>
>
> Gentlemen,
>  
>  
>  
> I have problem with some apache-flink modules. I am running a 1.17.0 apache- 
> flink and I write test codes in Colab I faced a problem for import modules 
>  
>  
>  
>  
> from pyflink.table import DataTypes 
>  
> from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime 
>  
> from pyflink.table.catalog import FileSystem 
>  
>  
>  
>  
> not working for me (python version 3.10) 
>  
>  
> Any help is highly appreciated the strange is that other modules importing 
> fine.  I checked with your Github but didn't find these on yours too which 
> means modules are not inside your descriptor.py too. I think it needed 
> installation of connectors but it failed too. 
>  
>  
>  
>  
> Please see the link below: 
>  
>  
>  
>  
>  
> [https://github.com/aomidvar/scrapper-price-comparison/blob/d8a10f74101bf96974e769813c33b83d7a71f02b/kafkaconsumer1.ipynb]
>  
>  
>  
>  
> I am running a test after producing the stream 
> ([https://github.com/aomidvar/scrapper-price-comparison/blob/main/kafkaproducer1.ipynb])
>  to Confluent server and I like to do a flink job but the above mentioned 
> modules are not found with the following links in collab:
>  
>  
> That is not probably a bug. Only version of apache-flink now working on colab 
> is 1.17.0. I prefer 3.10 but installed a virtual python 3.8 env and between 
> different modules found out that Kafka and Json modules are not in 
> descriptors.py of version 1.17 Apache-flink default. But modules exist in 
> Apache-flink 1.13 version.
> [https://colab.research.google.com/drive/1aHKv8WA6RA10zTdwdzUubB5K0anEmOws?usp=sharing]
> [https://colab.research.google.com/drive/1eCHJlsb8AjdmJtPc95X3H4btmFVSoCL4?usp=sharing]
>  
> I've got this error for Json, Kafka ...
> ---
>  
>  ImportError Traceback (most recent call last)  
> in () 1 from pyflink.table import DataTypes > 2 from 
> pyflink.table.descriptors import Schema, Kafka, Json, Rowtime 3 from 
> pyflink.table.catalog import FileSystem ImportError: cannot import name 
> 'Kafka' from 'pyflink.table.descriptors' 
> (/usr/local/lib/python3.10/dist-packages/pyflink/table/descriptors.py) 
>  
> ---
>  
>  NOTE: If your import is failing due to a missing package, you can manually 
> install dependencies using either !pip or !apt. To view examples of 
> installing some common dependencies, click the "Open Examples" button below.
>  
>  ---
>  
> I have doubt that if current error is related to a version and dependencies 
> then 
>  
> I have to ask the developer if I do this python 3.8 env is that possible to 
> get solved?
>  
>  
> Thanks for your time ,
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32208) Remove dependency on flink-shaded from flink-connector-aws

2023-05-26 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32208?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-32208:
---
Labels: pull-request-available  (was: )

> Remove dependency on flink-shaded from flink-connector-aws
> --
>
> Key: FLINK-32208
> URL: https://issues.apache.org/jira/browse/FLINK-32208
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / AWS, Connectors / DynamoDB
>Affects Versions: aws-connector-4.2.0
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
>
> The AWS connectors depend on flink-shaded. With the externalization of 
> connector, connectors shouldn't rely on Flink-Shaded but instead shade 
> dependencies such as this one themselves



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-aws] boring-cyborg[bot] commented on pull request #75: [FLINK-32208] Remove dependency on flink-shaded from flink-connector-aws

2023-05-26 Thread via GitHub


boring-cyborg[bot] commented on PR #75:
URL: 
https://github.com/apache/flink-connector-aws/pull/75#issuecomment-1564515776

   Thanks for opening this pull request! Please check out our contributing 
guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32206) Error ModuleNotFoundError for Pyflink.table.descriptors and wheel version mismatch

2023-05-26 Thread Alireza Omidvar (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alireza Omidvar updated FLINK-32206:

Description: 
Gentlemen,
 
 
 
I have problem with some apache-flink modules. I am running a 1.17.0 apache- 
flink and I write test codes in Colab I faced a problem for import modules 
 
 
 
 
from pyflink.table import DataTypes 
 
from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime 
 
from pyflink.table.catalog import FileSystem 
 
 
 
 
not working for me (python version 3.10) 
 
 
Any help is highly appreciated the strange is that other modules importing 
fine.  I checked with your Github but didn't find these on yours too which 
means modules are not inside your descriptor.py too. I think it needed 
installation of connectors but it failed too. 
 
 
 
 
Please see the link below: 
 
 
 
 
 
[https://github.com/aomidvar/scrapper-price-comparison/blob/d8a10f74101bf96974e769813c33b83d7a71f02b/kafkaconsumer1.ipynb]
 
 
 
 
I am running a test after producing the stream 
([https://github.com/aomidvar/scrapper-price-comparison/blob/main/kafkaproducer1.ipynb])
 to Confluent server and I like to do a flink job but the above mentioned 
modules are not found with the following links in collab:
 
 
That is not probably a bug. Only version of apache-flink now working on colab 
is 1.17.0. I prefer 3.10 but installed a virtual python 3.8 env and between 
different modules found out that Kafka and Json modules are not in 
descriptors.py of version 1.17 Apache-flink default. But modules exist in 
Apache-flink 1.13 version.
[https://colab.research.google.com/drive/1aHKv8WA6RA10zTdwdzUubB5K0anEmOws?usp=sharing]
[https://colab.research.google.com/drive/1eCHJlsb8AjdmJtPc95X3H4btmFVSoCL4?usp=sharing]
 
I've got this error for Json, Kafka ...
---
 
 ImportError Traceback (most recent call last)  
in () 1 from pyflink.table import DataTypes > 2 from 
pyflink.table.descriptors import Schema, Kafka, Json, Rowtime 3 from 
pyflink.table.catalog import FileSystem ImportError: cannot import name 'Kafka' 
from 'pyflink.table.descriptors' 
(/usr/local/lib/python3.10/dist-packages/pyflink/table/descriptors.py) 
 
---
 
 NOTE: If your import is failing due to a missing package, you can manually 
install dependencies using either !pip or !apt. To view examples of installing 
some common dependencies, click the "Open Examples" button below.
 
 ---
 
I have doubt that if current error is related to a version and dependencies 
then 
 
I have to ask the developer if I do this python 3.8 env is that possible to get 
solved?
 
 
Thanks for your time ,
 

> Error ModuleNotFoundError for Pyflink.table.descriptors and wheel version 
> mismatch
> --
>
> Key: FLINK-32206
> URL: https://issues.apache.org/jira/browse/FLINK-32206
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: mongodb-1.0.1
>Reporter: Alireza Omidvar
>Priority: Major
>
> Gentlemen,
>  
>  
>  
> I have problem with some apache-flink modules. I am running a 1.17.0 apache- 
> flink and I write test codes in Colab I faced a problem for import modules 
>  
>  
>  
>  
> from pyflink.table import DataTypes 
>  
> from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime 
>  
> from pyflink.table.catalog import FileSystem 
>  
>  
>  
>  
> not working for me (python version 3.10) 
>  
>  
> Any help is highly appreciated the strange is that other modules importing 
> fine.  I checked with your Github but didn't find these on yours too which 
> means modules are not inside your descriptor.py too. I think it needed 
> installation of connectors but it failed too. 
>  
>  
>  
>  
> Please see the link below: 
>  
>  
>  
>  
>  
> [https://github.com/aomidvar/scrapper-price-comparison/blob/d8a10f74101bf96974e769813c33b83d7a71f02b/kafkaconsumer1.ipynb]
>  
>  
>  
>  
> I am running a test after producing the stream 
> ([https://github.com/aomidvar/scrapper-price-comparison/blob/main/kafkaproducer1.ipynb])
>  to Confluent server and I like to do a flink job but the above mentioned 
> modules are not found with the following links in collab:
>  
>  
> That is not probably a bug. Only version of apache-flink now working on colab 
> is 1.17.0. I prefer 3.10 but installed a virtual python 3.8 env and between 
> different modules found out that Kafka and Json modules are not in 
> descriptors.py of version 1.17 Apache-flink default. But modules exist in 
> Apache-flink 1.13 version.
> 

[jira] [Created] (FLINK-32208) Remove dependency on flink-shaded from flink-connector-aws

2023-05-26 Thread Sergey Nuyanzin (Jira)
Sergey Nuyanzin created FLINK-32208:
---

 Summary: Remove dependency on flink-shaded from flink-connector-aws
 Key: FLINK-32208
 URL: https://issues.apache.org/jira/browse/FLINK-32208
 Project: Flink
  Issue Type: Technical Debt
  Components: Connectors / AWS, Connectors / DynamoDB
Affects Versions: aws-connector-4.2.0
Reporter: Sergey Nuyanzin
Assignee: Sergey Nuyanzin


The AWS connectors depend on flink-shaded. With the externalization of 
connector, connectors shouldn't rely on Flink-Shaded but instead shade 
dependencies such as this one themselves



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] mxm merged pull request #22531: [FLINK-28853] Document the split level watermark alignment feature an…

2023-05-26 Thread via GitHub


mxm merged PR #22531:
URL: https://github.com/apache/flink/pull/22531


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] XComp commented on a diff in pull request #22010: [FLINK-31192][connectors/dataGen] Fix dataGen takes too long to initi…

2023-05-26 Thread via GitHub


XComp commented on code in PR #22010:
URL: https://github.com/apache/flink/pull/22010#discussion_r1206745426


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java:
##
@@ -65,33 +71,46 @@ public void open(
 this.checkpointedState == null,
 "The " + getClass().getSimpleName() + " has already been 
initialized.");
 
-this.checkpointedState =
-context.getOperatorStateStore()
-.getListState(
-new ListStateDescriptor<>(
-name + "-sequence-state", 
LongSerializer.INSTANCE));
-this.valuesToEmit = new ArrayDeque<>();
-if (context.isRestored()) {
-// upon restoring
+ListStateDescriptor stateDescriptor =
+new ListStateDescriptor<>(
+name + "-sequence-state", 
TypeInformation.of(InternalState.class));
+this.checkpointedState = 
context.getOperatorStateStore().getListState(stateDescriptor);
+this.internalStates = Lists.newArrayList();
 
-for (Long v : this.checkpointedState.get()) {
-this.valuesToEmit.add(v);
-}
+if (context.isRestored()) {
+checkpointedState.get().forEach(state -> 
internalStates.add(state));
 } else {
-// the first time the job is executed
-final int stepSize = runtimeContext.getNumberOfParallelSubtasks();
+// The first time the job is executed.
 final int taskIdx = runtimeContext.getIndexOfThisSubtask();
-final long congruence = start + taskIdx;
-
-long totalNoOfElements = Math.abs(end - start + 1);
-final int baseSize = safeDivide(totalNoOfElements, stepSize);
-final int toCollect =
-(totalNoOfElements % stepSize > taskIdx) ? baseSize + 1 : 
baseSize;
+final long stepSize = runtimeContext.getNumberOfParallelSubtasks();
+InternalState state = new InternalState(taskIdx, stepSize, start + 
taskIdx);
+internalStates.add(state);
+}
+}
 
-for (long collected = 0; collected < toCollect; collected++) {
-this.valuesToEmit.add(collected * stepSize + congruence);
+public Long nextValue() {
+Iterator iterator = internalStates.iterator();

Review Comment:
   Thanks for sharing your view, @RyanSkraba. I think you're right - the 
implementation in master (i.e. without this PR's change) doesn't support 
reducing the parallelism properly, too. 
   
   > Avoid performance skew after reducing parallelism. Currently, going from N 
tasks to N-1 tasks will double the length of time it takes, since one task will 
have twice as many values to emit as the others. This is a undesirable 
consequence of this PR, but can probably also be discussed and implemented in a 
subsequent step.
   
   I don't understand that part: Isn't the performance skew issue you're 
describing more general and not rely connected to the `DataGenerator`? The skew 
you're mentioning is happening in any case when reducing the parallelism. It's 
more of a question how Flink would distribute the operator state after the 
parallelism was reduced. :thinking: Therefore, I don't see this as a problem 
for this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] XComp commented on a diff in pull request #22341: [FLINK-27204] Refract FileSystemJobResultStore to execute I/O operations on the ioExecutor

2023-05-26 Thread via GitHub


XComp commented on code in PR #22341:
URL: https://github.com/apache/flink/pull/22341#discussion_r1206775003


##
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/JobResultStore.java:
##
@@ -43,69 +44,69 @@ public interface JobResultStore {
  * Registers the passed {@link JobResultEntry} instance as {@code dirty} 
which indicates that
  * clean-up operations still need to be performed. Once the job resource 
cleanup has been
  * finalized, we can mark the {@code JobResultEntry} as {@code clean} 
result using {@link
- * #markResultAsClean(JobID)}.
+ * #markResultAsCleanAsync(JobID)}.
  *
  * @param jobResultEntry The job result we wish to persist.
+ * @return CompletableFuture with the completed state.
  * @throws IOException if the creation of the dirty result failed for IO 
reasons.
  * @throws IllegalStateException if the passed {@code jobResultEntry} has 
a {@code JobID}
  * attached that is already registered in this {@code JobResultStore}.
  */
-void createDirtyResult(JobResultEntry jobResultEntry) throws IOException, 
IllegalStateException;
+CompletableFuture createDirtyResultAsync(JobResultEntry 
jobResultEntry);
 
 /**
  * Marks an existing {@link JobResultEntry} as {@code clean}. This 
indicates that no more
  * resource cleanup steps need to be performed. No actions should be 
triggered if the passed
  * {@code JobID} belongs to a job that was already marked as clean.
  *
  * @param jobId Ident of the job we wish to mark as clean.
+ * @return CompletableFuture with the completed state.
  * @throws IOException if marking the {@code dirty} {@code JobResultEntry} 
as {@code clean}
  * failed for IO reasons.
  * @throws NoSuchElementException if there is no corresponding {@code 
dirty} job present in the
  * store for the given {@code JobID}.

Review Comment:
   The `IOException` can be removed from the javaDoc now. It's going to be 
wrapped by the `CompletableFuture`. But we might want to mentioned the 
`NoSuchElementException` as part of the return value's documentation, still.



##
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/JobResultStore.java:
##
@@ -43,69 +44,69 @@ public interface JobResultStore {
  * Registers the passed {@link JobResultEntry} instance as {@code dirty} 
which indicates that
  * clean-up operations still need to be performed. Once the job resource 
cleanup has been
  * finalized, we can mark the {@code JobResultEntry} as {@code clean} 
result using {@link
- * #markResultAsClean(JobID)}.
+ * #markResultAsCleanAsync(JobID)}.
  *
  * @param jobResultEntry The job result we wish to persist.
+ * @return CompletableFuture with the completed state.
  * @throws IOException if the creation of the dirty result failed for IO 
reasons.
  * @throws IllegalStateException if the passed {@code jobResultEntry} has 
a {@code JobID}
  * attached that is already registered in this {@code JobResultStore}.

Review Comment:
   The IOException can be removed from the javaDoc now. It's going to be 
wrapped by the CompletableFuture. But we might want to mentioned the 
`IllegalStateException` as part of the return value's documentation, still.



##
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractThreadsafeJobResultStore.java:
##
@@ -44,64 +45,84 @@ public abstract class AbstractThreadsafeJobResultStore 
implements JobResultStore
 private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
 
 @Override
-public void createDirtyResult(JobResultEntry jobResultEntry) throws 
IOException {
-Preconditions.checkState(
-!hasJobResultEntry(jobResultEntry.getJobId()),
-"Job result store already contains an entry for job %s",
-jobResultEntry.getJobId());
-
-withWriteLock(() -> createDirtyResultInternal(jobResultEntry));
+public CompletableFuture createDirtyResultAsync(JobResultEntry 
jobResultEntry) {
+return hasJobResultEntryAsync(jobResultEntry.getJobId())
+.handle(
+(hasResult, error) -> {
+if (error != null || hasResult) {
+ExceptionUtils.rethrow(error);
+}
+try {
+withWriteLock(() -> 
createDirtyResultInternal(jobResultEntry));

Review Comment:
   You have to be careful here: The internal methods are now only triggering 
the actual logic in the `ioExecutor`. Therefore, only submitting the task is 
guarded under the lock. You're losing the synchronization of the IO tasks which 
will run in multiple threads of the ioExecutor for the 
`FileSystemJobResultStore` implementation.



##

[jira] [Updated] (FLINK-32203) Potential ClassLoader memory leak due to log4j configuration

2023-05-26 Thread Oleksandr Nitavskyi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleksandr Nitavskyi updated FLINK-32203:

Priority: Minor  (was: Major)

> Potential ClassLoader memory leak due to log4j configuration
> 
>
> Key: FLINK-32203
> URL: https://issues.apache.org/jira/browse/FLINK-32203
> Project: Flink
>  Issue Type: Bug
>Reporter: Oleksandr Nitavskyi
>Priority: Minor
>  Labels: pull-request-available
> Attachments: classloader_leak.png, 
> stack_trace_example_with_log4j_creation_on_job_reload.log
>
>
> *Context*
> We have encountered a memory leak related to ClassLoaders in Apache Flink. 
> ChildFirstClassLoader is not properly garbage collected, when job is being 
> restarted.
> Heap Dump has shown that Log4j starts a configuration watch thread, which 
> then has Strong reference to ChildFirstClassLoader via AccessControlContext. 
> Since thread is never stopped, ChildFirstClassLoader is never cleaned. 
> Removal monitorInterval introduced in FLINK-20510 helps to mitigate the 
> issue, I believe it could be applied to log4j config by default.
> *How to reproduce*
> Deploy Flink job, which uses Hadoop File System (e.g. s3a). Redeploy the job 
> -> in Task Manager dump you should see multiple Log4jThreads
> *AC*
> We have a configuration which doesn't lead easy to memory leak with default 
> configuration for Flink users.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] echauchot commented on pull request #22667: [FLINK-31806] Fix public API rule for connectors

2023-05-26 Thread via GitHub


echauchot commented on PR #22667:
URL: https://github.com/apache/flink/pull/22667#issuecomment-1564460268

   Tested it on Cassandra connector and it reports these violations:
   
   1. If I put back IOUtils use in the connector it raises 
   
   `Method 
 calls method 
 in 
(CassandraEnumeratorStateSerializer.java:90)
   `
   => This is what we wanted
   
   2. On current code base, it raises:
   `java.lang.AssertionError: Architecture Violation [Priority: MEDIUM] - Rule 
'Connector production code must not depend on non-public API outside of 
connector packages' was violated (16 times):
   Constructor 
(org.apache.flink.streaming.connectors.cassandra.ClusterBuilder,
 long, java.lang.Class, java.lang.String, 
org.apache.flink.streaming.connectors.cassandra.MapperOptions)> calls method 
 in 
(CassandraSource.java:138)
   Constructor 
(org.apache.flink.streaming.connectors.cassandra.ClusterBuilder,
 long, java.lang.Class, java.lang.String, 
org.apache.flink.streaming.connectors.cassandra.MapperOptions)> calls method 
 in (CassandraSource.java:124)
   Constructor 
(org.apache.flink.streaming.connectors.cassandra.ClusterBuilder,
 long, java.lang.Class, java.lang.String, 
org.apache.flink.streaming.connectors.cassandra.MapperOptions)> calls method 
 in (CassandraSource.java:125)
   Constructor 
(org.apache.flink.streaming.connectors.cassandra.ClusterBuilder,
 long, java.lang.Class, java.lang.String, 
org.apache.flink.streaming.connectors.cassandra.MapperOptions)> calls method 
 in (CassandraSource.java:126)
   Constructor 
(org.apache.flink.streaming.connectors.cassandra.ClusterBuilder,
 long, java.lang.Class, java.lang.String, 
org.apache.flink.streaming.connectors.cassandra.MapperOptions)> calls method 
 in (CassandraSource.java:127)
   Constructor 
(org.apache.flink.streaming.connectors.cassandra.ClusterBuilder,
 long, java.lang.Class, java.lang.String, 
org.apache.flink.streaming.connectors.cassandra.MapperOptions)> gets field 
 in 
(CassandraSource.java:138)
   Field 

 has generic type 
> with 
type argument depending on  
in (CassandraSplitSerializer.java:0)
   Method 

 calls method  in (CassandraSource.java:145)
   Method 

 calls method  in (CassandraSource.java:149)
   Method 

 is annotated with  in 
(CassandraSource.java:0)
   Method 
 is annotated with 
 in (CassandraSplitReader.java:0)
   Method 
 calls constructor 
([B)> in 
(CassandraSplitSerializer.java:57)
   Method 

 calls method  in 
(CassandraSplitSerializer.java:51)
   Method 

 calls method 
 in 
(CassandraSplitSerializer.java:50)
   Method 

 is annotated with  in 
(SplitsGenerator.java:0)
   Static Initializer 
()>
 calls constructor 
(int)> in 
(CassandraSplitSerializer.java:34)
   `
   => These are calls to ClosureCleaner, checkState and checkNotNull, 
VisibleForTesting which should be added as violation exceptions
   => The only discutable part is the dependency on 
`org.apache.flink.core.memory.DataInputDeserializer` and 
`org.apache.flink.core.memory.DataOutputSerializer` which are internal APIs but 
we could definitely use [ByteArray | Object]Streams JDK counterparts instead in 
the Cassandra code.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-32207) Error ModuleNotFoundError for Pyflink.table.descriptors and wheel version mismatch

2023-05-26 Thread Alireza Omidvar (Jira)
Alireza Omidvar created FLINK-32207:
---

 Summary: Error ModuleNotFoundError for Pyflink.table.descriptors 
and wheel version mismatch
 Key: FLINK-32207
 URL: https://issues.apache.org/jira/browse/FLINK-32207
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.17.0
 Environment: Colab

Python3.10 
Reporter: Alireza Omidvar
 Attachments: image (1).png, image (2).png

Gentlemen,
 
 
 
I have problem with some apache-flink modules. I am running a 1.17.0 apache- 
flink and I write test codes in Colab I faced a problem for import modules 
 
 
 
 
from pyflink.table import DataTypes 
 
from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime 
 
from pyflink.table.catalog import FileSystem 
 
 
 
 
not working for me (python version 3.10) 
 
 
Any help is highly appreciated the strange is that other modules importing 
fine.  I checked with your Github but didn't find these on yours too which 
means modules are not inside your descriptor.py too. I think it needed 
installation of connectors but it failed too. 
 
 
 
 
Please see the link below: 
 
 
 
 
 
[https://github.com/aomidvar/scrapper-price-comparison/blob/d8a10f74101bf96974e769813c33b83d7a71f02b/kafkaconsumer1.ipynb]
 
 
 
 
I am running a test after producing the stream 
([https://github.com/aomidvar/scrapper-price-comparison/blob/main/kafkaproducer1.ipynb])
 to Confluent server and I like to do a flink job but the above mentioned 
modules are not found with the following links in collab:
 
 
That is not probably a bug. Only version of apache-flink now working on colab 
is 1.17.0. I prefer 3.10 but installed a virtual python 3.8 env and between 
different modules found out that Kafka and Json modules are not in 
descriptors.py of version 1.17 Apache-flink default. But modules exist in 
Apache-flink 1.13 version.
[https://colab.research.google.com/drive/1aHKv8WA6RA10zTdwdzUubB5K0anEmOws?usp=sharing]
[https://colab.research.google.com/drive/1eCHJlsb8AjdmJtPc95X3H4btmFVSoCL4?usp=sharing]
 
I've got this error for Json, Kafka ...
---
 
 ImportError Traceback (most recent call last)  
in () 1 from pyflink.table import DataTypes > 2 from 
pyflink.table.descriptors import Schema, Kafka, Json, Rowtime 3 from 
pyflink.table.catalog import FileSystem ImportError: cannot import name 'Kafka' 
from 'pyflink.table.descriptors' 
(/usr/local/lib/python3.10/dist-packages/pyflink/table/descriptors.py) 
 
---
 
 NOTE: If your import is failing due to a missing package, you can manually 
install dependencies using either !pip or !apt. To view examples of installing 
some common dependencies, click the "Open Examples" button below.
 
 ---
 
I have doubt that if current error is related to a version and dependencies 
then 
 
I have to ask the developer if I do this python 3.8 env is that possible to get 
solved?
 
 
Thanks for your time ,
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32206) Error ModuleNotFoundError for Pyflink.table.descriptors and wheel version mismatch

2023-05-26 Thread Alireza Omidvar (Jira)
Alireza Omidvar created FLINK-32206:
---

 Summary: Error ModuleNotFoundError for Pyflink.table.descriptors 
and wheel version mismatch
 Key: FLINK-32206
 URL: https://issues.apache.org/jira/browse/FLINK-32206
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: mongodb-1.0.1
Reporter: Alireza Omidvar






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] RyanSkraba commented on a diff in pull request #22010: [FLINK-31192][connectors/dataGen] Fix dataGen takes too long to initi…

2023-05-26 Thread via GitHub


RyanSkraba commented on code in PR #22010:
URL: https://github.com/apache/flink/pull/22010#discussion_r1206825892


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java:
##
@@ -65,33 +71,46 @@ public void open(
 this.checkpointedState == null,
 "The " + getClass().getSimpleName() + " has already been 
initialized.");
 
-this.checkpointedState =
-context.getOperatorStateStore()
-.getListState(
-new ListStateDescriptor<>(
-name + "-sequence-state", 
LongSerializer.INSTANCE));
-this.valuesToEmit = new ArrayDeque<>();
-if (context.isRestored()) {
-// upon restoring
+ListStateDescriptor stateDescriptor =
+new ListStateDescriptor<>(
+name + "-sequence-state", 
TypeInformation.of(InternalState.class));
+this.checkpointedState = 
context.getOperatorStateStore().getListState(stateDescriptor);
+this.internalStates = Lists.newArrayList();
 
-for (Long v : this.checkpointedState.get()) {
-this.valuesToEmit.add(v);
-}
+if (context.isRestored()) {
+checkpointedState.get().forEach(state -> 
internalStates.add(state));
 } else {
-// the first time the job is executed
-final int stepSize = runtimeContext.getNumberOfParallelSubtasks();
+// The first time the job is executed.
 final int taskIdx = runtimeContext.getIndexOfThisSubtask();
-final long congruence = start + taskIdx;
-
-long totalNoOfElements = Math.abs(end - start + 1);
-final int baseSize = safeDivide(totalNoOfElements, stepSize);
-final int toCollect =
-(totalNoOfElements % stepSize > taskIdx) ? baseSize + 1 : 
baseSize;
+final long stepSize = runtimeContext.getNumberOfParallelSubtasks();
+InternalState state = new InternalState(taskIdx, stepSize, start + 
taskIdx);
+internalStates.add(state);
+}
+}
 
-for (long collected = 0; collected < toCollect; collected++) {
-this.valuesToEmit.add(collected * stepSize + congruence);
+public Long nextValue() {
+Iterator iterator = internalStates.iterator();

Review Comment:
   @xuzhiwen1255 This sounds perfect!
   > But I have an idea, that is, for each subtask, we let him generate more 
InternalState
   
   That is to say, instead of having N internal states when there are N tasks 
initially, we would have 2N, or 10N, or 100N.  That number doesn't have to be 
extremely high to avoid skew here, and still benefit from the "concise" and 
efficient InternalState you've implemented here.
   
   I think there might be some cool way of splitting up the ranges differently, 
but I'd like to think about it!  This should probably go on the future JIRA 
though :D 
   
   @XComp Yes, this issue would happen to any source that created "one split 
per task" on initialization, where the split corresponds to the operator state 
-- but the previous implementation didn't do that.  It was one state per 
element to emit, so it _could_ be perfectly rebalanced (but overall pretty 
inefficiently).
   
   I agree that it's not a problem for this PR!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on a diff in pull request #22579: [FLINK-32059] Migrate subclasses of BatchAbstractTestBase in batch.sql.agg and batch.sql.join to JUnit5

2023-05-26 Thread via GitHub


reswqa commented on code in PR #22579:
URL: https://github.com/apache/flink/pull/22579#discussion_r1206813779


##
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/SemiJoinITCase.scala:
##
@@ -470,7 +469,7 @@ class SemiJoinITCase(expectedJoinType: JoinType) extends 
BatchTestBase {
 }
 
 object SemiJoinITCase {
-  @Parameterized.Parameters(name = "{0}-{1}")
+  @Parameters(name = "{0}-{1}")

Review Comment:
   Then we should fix it. Please try to give it a more appropriate name.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on a diff in pull request #22579: [FLINK-32059] Migrate subclasses of BatchAbstractTestBase in batch.sql.agg and batch.sql.join to JUnit5

2023-05-26 Thread via GitHub


reswqa commented on code in PR #22579:
URL: https://github.com/apache/flink/pull/22579#discussion_r1206804563


##
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/JoinITCase.scala:
##
@@ -1333,7 +1335,7 @@ class JoinITCase(expectedJoinType: JoinType) extends 
BatchTestBase {
 }
 
 object JoinITCase {
-  @Parameterized.Parameters(name = "{0}")
+  @Parameters(name = "{0}")

Review Comment:
   I don't quite understand why modifying the strings(value of name) doesn't go 
through compilation? What you should modify is the value of name, not itself.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #22667: [FLINK-31806] Fix public API rule for connectors

2023-05-26 Thread via GitHub


flinkbot commented on PR #22667:
URL: https://github.com/apache/flink/pull/22667#issuecomment-1564407238

   
   ## CI report:
   
   * 21e34a00657502a6a502e436152d5cb01aad4872 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-31806) Prod architecture tests didn't detect non-public API usage

2023-05-26 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31806?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-31806:
---
Labels: pull-request-available  (was: )

> Prod architecture tests didn't detect non-public API usage
> --
>
> Key: FLINK-31806
> URL: https://issues.apache.org/jira/browse/FLINK-31806
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / Cassandra, Tests
>Affects Versions: cassandra-3.0.0, 1.18.0
>Reporter: Chesnay Schepler
>Assignee: Etienne Chauchot
>Priority: Critical
>  Labels: pull-request-available
>
> FLINK-31805 wasn't detected by the production architecture tests.
> Not sure if this is an issue on the cassandra or Flink side.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32203) Potential ClassLoader memory leak due to log4j configuration

2023-05-26 Thread Oleksandr Nitavskyi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726626#comment-17726626
 ] 

Oleksandr Nitavskyi commented on FLINK-32203:
-

[~chesnay] thanks for looking into PR 
(https://github.com/apache/flink/pull/22664). You can see in attach an example 
of the stack trace, which we get when Log4jThread is being created.

We have run the job and were killing one of JobManager to rely on HA and 
trigger the job restart.
During debug of the Log4jThread creation we saw that in StackTrace there are 
Presto (for checkpoint) or Hadoop S3A (to write output on S3) FileSystems, 
which are loaded from Plugin Classloader. (example stack trace is attached)

Do you know if a plugin Classloader instance is created per job, when a job is 
being created? If yes, probably this instance is being passed to 
Log4jContextFactory and thus a new Log4j subsystem being created.

> Potential ClassLoader memory leak due to log4j configuration
> 
>
> Key: FLINK-32203
> URL: https://issues.apache.org/jira/browse/FLINK-32203
> Project: Flink
>  Issue Type: Bug
>Reporter: Oleksandr Nitavskyi
>Priority: Major
>  Labels: pull-request-available
> Attachments: classloader_leak.png, 
> stack_trace_example_with_log4j_creation_on_job_reload.log
>
>
> *Context*
> We have encountered a memory leak related to ClassLoaders in Apache Flink. 
> ChildFirstClassLoader is not properly garbage collected, when job is being 
> restarted.
> Heap Dump has shown that Log4j starts a configuration watch thread, which 
> then has Strong reference to ChildFirstClassLoader via AccessControlContext. 
> Since thread is never stopped, ChildFirstClassLoader is never cleaned. 
> Removal monitorInterval introduced in FLINK-20510 helps to mitigate the 
> issue, I believe it could be applied to log4j config by default.
> *How to reproduce*
> Deploy Flink job, which uses Hadoop File System (e.g. s3a). Redeploy the job 
> -> in Task Manager dump you should see multiple Log4jThreads
> *AC*
> We have a configuration which doesn't lead easy to memory leak with default 
> configuration for Flink users.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-jdbc] matriv commented on pull request #29: [FLINK-31551] Add support for CrateDB

2023-05-26 Thread via GitHub


matriv commented on PR #29:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/29#issuecomment-1564380946

   @MartijnVisser @snuyanzin I've rebased again, can you please run ci again?
   It would be great to merge it if everything passes, thx.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] XComp commented on a diff in pull request #22010: [FLINK-31192][connectors/dataGen] Fix dataGen takes too long to initi…

2023-05-26 Thread via GitHub


XComp commented on code in PR #22010:
URL: https://github.com/apache/flink/pull/22010#discussion_r1206745426


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java:
##
@@ -65,33 +71,46 @@ public void open(
 this.checkpointedState == null,
 "The " + getClass().getSimpleName() + " has already been 
initialized.");
 
-this.checkpointedState =
-context.getOperatorStateStore()
-.getListState(
-new ListStateDescriptor<>(
-name + "-sequence-state", 
LongSerializer.INSTANCE));
-this.valuesToEmit = new ArrayDeque<>();
-if (context.isRestored()) {
-// upon restoring
+ListStateDescriptor stateDescriptor =
+new ListStateDescriptor<>(
+name + "-sequence-state", 
TypeInformation.of(InternalState.class));
+this.checkpointedState = 
context.getOperatorStateStore().getListState(stateDescriptor);
+this.internalStates = Lists.newArrayList();
 
-for (Long v : this.checkpointedState.get()) {
-this.valuesToEmit.add(v);
-}
+if (context.isRestored()) {
+checkpointedState.get().forEach(state -> 
internalStates.add(state));
 } else {
-// the first time the job is executed
-final int stepSize = runtimeContext.getNumberOfParallelSubtasks();
+// The first time the job is executed.
 final int taskIdx = runtimeContext.getIndexOfThisSubtask();
-final long congruence = start + taskIdx;
-
-long totalNoOfElements = Math.abs(end - start + 1);
-final int baseSize = safeDivide(totalNoOfElements, stepSize);
-final int toCollect =
-(totalNoOfElements % stepSize > taskIdx) ? baseSize + 1 : 
baseSize;
+final long stepSize = runtimeContext.getNumberOfParallelSubtasks();
+InternalState state = new InternalState(taskIdx, stepSize, start + 
taskIdx);
+internalStates.add(state);
+}
+}
 
-for (long collected = 0; collected < toCollect; collected++) {
-this.valuesToEmit.add(collected * stepSize + congruence);
+public Long nextValue() {
+Iterator iterator = internalStates.iterator();

Review Comment:
   Thanks for sharing your view, @RyanSkraba. I think you're right - the 
implementation in master (i.e. without this PR's change) doesn't support 
reducing the parallelism properly, too. 
   ```
   Avoid performance skew after reducing parallelism. Currently, going from N 
tasks to N-1 tasks will double the length of time it takes, since one task will 
have twice as many values to emit as the others. This is a undesirable 
consequence of this PR, but can probably also be discussed and implemented in a 
subsequent step.
   ```
   I don't understand that part: Isn't the performance skew issue you're 
describing more general and not rely connected to the `DataGenerator`? The skew 
you're mentioning is happening in any case when reducing the parallelism. It's 
more of a question how Flink would distribute the operator state after the 
parallelism was reduced. :thinking: Therefore, I don't see this as a problem 
for this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] WencongLiu commented on a diff in pull request #22579: [FLINK-32059] Migrate subclasses of BatchAbstractTestBase in batch.sql.agg and batch.sql.join to JUnit5

2023-05-26 Thread via GitHub


WencongLiu commented on code in PR #22579:
URL: https://github.com/apache/flink/pull/22579#discussion_r1206741341


##
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/OuterJoinITCase.scala:
##
@@ -398,7 +398,7 @@ class OuterJoinITCase(expectedJoinType: JoinType) extends 
BatchTestBase {
 }
 
 object OuterJoinITCase {
-  @Parameterized.Parameters(name = "{0}")
+  @Parameters(name = "{0}")

Review Comment:
   Same reason with before.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] WencongLiu commented on a diff in pull request #22579: [FLINK-32059] Migrate subclasses of BatchAbstractTestBase in batch.sql.agg and batch.sql.join to JUnit5

2023-05-26 Thread via GitHub


WencongLiu commented on code in PR #22579:
URL: https://github.com/apache/flink/pull/22579#discussion_r1206740602


##
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/JoinITCase.scala:
##
@@ -1333,7 +1335,7 @@ class JoinITCase(expectedJoinType: JoinType) extends 
BatchTestBase {
 }
 
 object JoinITCase {
-  @Parameterized.Parameters(name = "{0}")
+  @Parameters(name = "{0}")

Review Comment:
   If the "name" is modified to "expectedJoinType". It can't be compiled 
successfully.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] WencongLiu commented on a diff in pull request #22579: [FLINK-32059] Migrate subclasses of BatchAbstractTestBase in batch.sql.agg and batch.sql.join to JUnit5

2023-05-26 Thread via GitHub


WencongLiu commented on code in PR #22579:
URL: https://github.com/apache/flink/pull/22579#discussion_r1206740122


##
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/LookupJoinITCase.scala:
##
@@ -26,23 +26,31 @@ import 
org.apache.flink.table.planner.factories.TestValuesTableFactory
 import org.apache.flink.table.planner.runtime.utils.{BatchTestBase, 
InMemoryLookupableTableSource}
 import 
org.apache.flink.table.runtime.functions.table.fullcache.inputformat.FullCacheTestInputFormat
 import org.apache.flink.table.runtime.functions.table.lookup.LookupCacheManager
+import org.apache.flink.testutils.junit.extensions.parameterized.{Parameter, 
ParameterizedTestExtension, Parameters}
 import org.apache.flink.types.Row
 
 import org.assertj.core.api.Assertions.assertThat
+import org.assertj.core.api.Assumptions.assumeThat
 import org.assertj.core.api.IterableAssert.assertThatIterable
-import org.junit.{After, Assume, Before, Test}
-import org.junit.Assert.assertEquals
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
+import org.junit.jupiter.api.{AfterEach, BeforeEach, TestTemplate}
+import org.junit.jupiter.api.Assertions.assertEquals

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] WencongLiu commented on a diff in pull request #22579: [FLINK-32059] Migrate subclasses of BatchAbstractTestBase in batch.sql.agg and batch.sql.join to JUnit5

2023-05-26 Thread via GitHub


WencongLiu commented on code in PR #22579:
URL: https://github.com/apache/flink/pull/22579#discussion_r1206739358


##
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/SemiJoinITCase.scala:
##
@@ -470,7 +469,7 @@ class SemiJoinITCase(expectedJoinType: JoinType) extends 
BatchTestBase {
 }
 
 object SemiJoinITCase {
-  @Parameterized.Parameters(name = "{0}-{1}")
+  @Parameters(name = "{0}-{1}")

Review Comment:
   Yes. There is one parameter. 



##
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/JoinITCase.scala:
##
@@ -1333,7 +1335,7 @@ class JoinITCase(expectedJoinType: JoinType) extends 
BatchTestBase {
 }
 
 object JoinITCase {
-  @Parameterized.Parameters(name = "{0}")
+  @Parameters(name = "{0}")

Review Comment:
   If the "name" is modified to "expectedJoinType". It can't be compiled 
successfully.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32204) ZooKeeperLeaderElectionTest.testZooKeeperReelectionWithReplacement fails with The ExecutorService is shut down already. No Callables can be executed on AZP

2023-05-26 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726616#comment-17726616
 ] 

Matthias Pohl commented on FLINK-32204:
---

FYI: I couldn't reproduce the error locally with 1 test executions.

> ZooKeeperLeaderElectionTest.testZooKeeperReelectionWithReplacement fails with 
> The ExecutorService is shut down already. No Callables can be executed on AZP
> ---
>
> Key: FLINK-32204
> URL: https://issues.apache.org/jira/browse/FLINK-32204
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.18.0
>Reporter: Sergey Nuyanzin
>Assignee: Matthias Pohl
>Priority: Critical
>  Labels: test-stability
>
> [This 
> build|https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49386=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=7095]
>  fails as
> {noformat}
> May 25 18:45:50 Caused by: java.util.concurrent.RejectedExecutionException: 
> The ExecutorService is shut down already. No Callables can be executed.
> May 25 18:45:50   at 
> org.apache.flink.util.concurrent.DirectExecutorService.throwRejectedExecutionExceptionIfShutdown(DirectExecutorService.java:237)
> May 25 18:45:50   at 
> org.apache.flink.util.concurrent.DirectExecutorService.submit(DirectExecutorService.java:100)
> May 25 18:45:50   at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.TreeCache.publishEvent(TreeCache.java:902)
> May 25 18:45:50   at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.TreeCache.publishEvent(TreeCache.java:894)
> May 25 18:45:50   at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.TreeCache.access$1200(TreeCache.java:79)
> May 25 18:45:50   at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.TreeCache$TreeNode.processResult(TreeCache.java:489)
> May 25 18:45:50   at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.CuratorFrameworkImpl.sendToBackgroundCallback(CuratorFrameworkImpl.java:926)
> May 25 18:45:50   at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.CuratorFrameworkImpl.processBackgroundOperation(CuratorFrameworkImpl.java:683)
> May 25 18:45:50   at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.WatcherRemovalFacade.processBackgroundOperation(WatcherRemovalFacade.java:152)
> May 25 18:45:50   at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.GetDataBuilderImpl$3.processResult(GetDataBuilderImpl.java:272)
> May 25 18:45:50   at 
> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:634)
> May 25 18:45:50   at 
> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:553)
> May 25 18:45:50
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32204) ZooKeeperLeaderElectionTest.testZooKeeperReelectionWithReplacement fails with The ExecutorService is shut down already. No Callables can be executed on AZP

2023-05-26 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726613#comment-17726613
 ] 

Matthias Pohl commented on FLINK-32204:
---

The curator's {{TreeCache}} isn't thread-safe: The event processing within the 
cache happens is triggered from within the client's EventThread and calls 
{{TreeCache#publishEvent}}:
{code}
[...]
May 25 18:45:50 at 
org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.TreeCache.publishEvent(TreeCache.java:902)
May 25 18:45:50 at 
org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.TreeCache.publishEvent(TreeCache.java:894)
[...]
May 25 18:45:50 at 
org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:634)
May 25 18:45:50 at 
org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:553)
[...]
{code}
{{TreeCache#publishEvent}} will submit a new task to the cache's 
{{executorService}} (see 
[TreeCache:901|https://github.com/apache/curator/blob/844c0ad36340b695b2784489c078cfd78522143c/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java#L901])
 which is a {{directExecutorService}} in the case of Flink's ZooKeeper 
LeaderElectionDriver implementations (see 
[ZooKeeperUtils:764|https://github.com/apache/flink/blob/4576e4384ff36623712043564039f654c3b44a30/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java#L764]
 for the legacy {{ZooKeeperLeaderElectionDriver}} and 
[ZooKeeperMultipleComponentLeaderElectionDriver:76|https://github.com/apache/flink/blob/8ddfd590ebba7fc727e79db41b82d3d40a02b56a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperMultipleComponentLeaderElectionDriver.java#L76]).
 The close call happens in the test's main thread.

The {{TreeCache#close}} call sets the cache's state to {{CLOSED}} in 
[TreeCache:628|https://github.com/apache/curator/blob/844c0ad36340b695b2784489c078cfd78522143c/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java#L628].
 {{TreeCache#publishEvent}} checks this state in 
[TreeCache:898|https://github.com/apache/curator/blob/844c0ad36340b695b2784489c078cfd78522143c/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java#L898].
 The latter one doesn't use a lock. This can cause a race condition where the 
{{publishEvent}} method is called and passes the if condition before the test's 
main thread can trigger the close method but after the task is actually 
submitted causing the {{RejectedExecutionException}} which we're observing 
right now.

[~dmvk] may you verify my finding? I would suggest adding a less restrictive 
version of the {{DirectExecutorService}} that we could use in the production 
code to avoid running into this bug. We could continue to use the more 
restrictive version (which was introduced in FLINK-31995) in the test code. 
WDYT David?

> ZooKeeperLeaderElectionTest.testZooKeeperReelectionWithReplacement fails with 
> The ExecutorService is shut down already. No Callables can be executed on AZP
> ---
>
> Key: FLINK-32204
> URL: https://issues.apache.org/jira/browse/FLINK-32204
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.18.0
>Reporter: Sergey Nuyanzin
>Assignee: Matthias Pohl
>Priority: Critical
>  Labels: test-stability
>
> [This 
> build|https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49386=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=7095]
>  fails as
> {noformat}
> May 25 18:45:50 Caused by: java.util.concurrent.RejectedExecutionException: 
> The ExecutorService is shut down already. No Callables can be executed.
> May 25 18:45:50   at 
> org.apache.flink.util.concurrent.DirectExecutorService.throwRejectedExecutionExceptionIfShutdown(DirectExecutorService.java:237)
> May 25 18:45:50   at 
> org.apache.flink.util.concurrent.DirectExecutorService.submit(DirectExecutorService.java:100)
> May 25 18:45:50   at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.TreeCache.publishEvent(TreeCache.java:902)
> May 25 18:45:50   at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.TreeCache.publishEvent(TreeCache.java:894)
> May 25 18:45:50   at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.TreeCache.access$1200(TreeCache.java:79)
> May 25 18:45:50   at 
> 

[jira] [Comment Edited] (FLINK-32188) Does the custom connector not support pushing down "where" query predicates to query fields of array type?

2023-05-26 Thread Xin Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726609#comment-17726609
 ] 

Xin Chen edited comment on FLINK-32188 at 5/26/23 12:32 PM:


[~jark]  Thank you for your reply!So should I submit a PR about this in 
github?Or the better way to fix this?

When I fix the issue through the method I introduced above,and url is defined 
as an array,I can obtain predicates successfully.
But I found another issue in my debugging process at the same time,when url is 
defined as string not an array,sql like this returned no results:
{code:java}
String s1_and = "select * from image_source " +
"where url = 'aaa.jpg'" +
"and url = 'bbb.jpg'";
{code}
This is because ‘url‘ cannot be assigned two different values simultaneously. I 
think this is logical. But when I defined it as an array type, sql like this 
returned two records unexpectedly! 

{code:java}
String s3_and = "select * from image_source where " +
"url = ARRAY['aaa.jpg', 'bbb.jpg'] and url = ARRAY['ccc.jpg', 
'ddd.jpg']";
{code}
 I debugged and found that the task has entered the 
*PushFilterIntoTableSourceScanRule *of org. apache. Flink. table. planner. 
plan. rules. logical, but when url is a string,the task of “url = xxx and url = 
yyy” didn't enter 'onMatch' method of *PushFilterIntoTableSourceScanRule*. 

 !screenshot-8.png! 
{code:java}
// filter:
rel#100:LogicalFilter.NONE.any.None: 
0.[NONE].[NONE](input=HepRelVertex#99,condition=
AND(=($0, CAST(ARRAY(_UTF-16LE'aaa.jpg', 
_UTF-16LE'bbb.jpg')):VARCHAR(2147483647) CHARACTER SET "UTF-16LE" ARRAY NOT 
NULL), 
=($0, CAST(ARRAY(_UTF-16LE'ccc.jpg', _UTF-16LE'ddd.jpg')):VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" ARRAY NOT NULL)))
{code}

I am confused how the frame work to verify that a field cannot be equal to two 
values. In which module is this processed?Perhaps it should be in the code 
before ‘PushFilterIntoTableSourceScanRule’, but where is it?
I think this verification logic may need to be fixed for array types.


was (Author: JIRAUSER298666):
[~jark]  Thank you for your reply!So should I submit a PR about this in 
github?Or the better way to fix this?

When I fix the issue through the method I introduced above,and url is defined 
as an array,I can obtain predicates successfully.
But I found another issue in my debugging process at the same time,when url is 
defined as string not an array,sql like this returned no results:
{code:java}
String s1_and = "select * from image_source " +
"where url = 'aaa.jpg'" +
"and url = 'bbb.jpg'";
{code}
This is because ‘url‘ cannot be assigned two different values simultaneously. I 
think this is logical. But when I defined it as an array type, sql like this 
returned two records unexpectedly! 

{code:java}
String s3_and = "select * from image_source where " +
"url = ARRAY['aaa.jpg', 'bbb.jpg'] and url = ARRAY['ccc.jpg', 
'ddd.jpg']";
{code}
 I debugged and found that the task has entered the 
*PushFilterIntoTableSourceScanRule *of org. apache. Flink. table. planner. 
plan. rules. logical, but when url is a string,the task of “url = xxx and url = 
yyy” didn't enter 'onMatch' method of *PushFilterIntoTableSourceScanRule*. 

 !screenshot-8.png! 
{code:java}
// filter:
rel#100:LogicalFilter.NONE.any.None: 
0.[NONE].[NONE](input=HepRelVertex#99,condition=
AND(=($0, CAST(ARRAY(_UTF-16LE'aaa.jpg', 
_UTF-16LE'bbb.jpg')):VARCHAR(2147483647) CHARACTER SET "UTF-16LE" ARRAY NOT 
NULL), 
=($0, CAST(ARRAY(_UTF-16LE'ccc.jpg', _UTF-16LE'ddd.jpg')):VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" ARRAY NOT NULL)))
{code}

I am confused how the frame work to verify that a field cannot be equal to two 
values. In which module is this processed?Perhaps it should be in the code 
before ‘PushFilterIntoTableSourceScanRule’, but where is it?
I think it need to be fixed for array types.

> Does the custom connector not support pushing down "where" query predicates 
> to query fields of array type?
> --
>
> Key: FLINK-32188
> URL: https://issues.apache.org/jira/browse/FLINK-32188
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.2, 1.17.0, 1.16.1
>Reporter: Xin Chen
>Priority: Major
> Attachments: image-2023-05-25-17-16-02-288.png, 
> image-2023-05-25-20-44-08-834.png, image-2023-05-25-20-44-47-581.png, 
> screenshot-1.png, screenshot-2.png, screenshot-3.png, screenshot-4.png, 
> screenshot-5.png, screenshot-6.png, screenshot-7.png, screenshot-8.png
>
>
> When I customized a data source connector which assumed as image-connector, I 
> found that when creating a table with ddl, I specified a field URL as an 
> array type. 

[jira] [Comment Edited] (FLINK-32188) Does the custom connector not support pushing down "where" query predicates to query fields of array type?

2023-05-26 Thread Xin Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726609#comment-17726609
 ] 

Xin Chen edited comment on FLINK-32188 at 5/26/23 12:30 PM:


[~jark]  Thank you for your reply!So should I submit a PR about this in 
github?Or the better way to fix this?

When I fix the issue through the method I introduced above,and url is defined 
as an array,I can obtain predicates successfully.
But I found another issue in my debugging process at the same time,when url is 
defined as string not an array,sql like this returned no results:
{code:java}
String s1_and = "select * from image_source " +
"where url = 'aaa.jpg'" +
"and url = 'bbb.jpg'";
{code}
This is because ‘url‘ cannot be assigned two different values simultaneously. I 
think this is logical. But when I defined it as an array type, sql like this 
returned two records unexpectedly! 

{code:java}
String s3_and = "select * from image_source where " +
"url = ARRAY['aaa.jpg', 'bbb.jpg'] and url = ARRAY['ccc.jpg', 
'ddd.jpg']";
{code}
 I debugged and found that the task has entered the 
*PushFilterIntoTableSourceScanRule *of org. apache. Flink. table. planner. 
plan. rules. logical, but when url is a string,the task of “url = xxx and url = 
yyy” didn't enter 'onMatch' method of *PushFilterIntoTableSourceScanRule*. 

 !screenshot-8.png! 
{code:java}
// filter:
rel#100:LogicalFilter.NONE.any.None: 
0.[NONE].[NONE](input=HepRelVertex#99,condition=
AND(=($0, CAST(ARRAY(_UTF-16LE'aaa.jpg', 
_UTF-16LE'bbb.jpg')):VARCHAR(2147483647) CHARACTER SET "UTF-16LE" ARRAY NOT 
NULL), 
=($0, CAST(ARRAY(_UTF-16LE'ccc.jpg', _UTF-16LE'ddd.jpg')):VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" ARRAY NOT NULL)))
{code}

I am confused how the frame work to verify that a field cannot be equal to two 
values. In which module is this processed?Perhaps it should be in the code 
before ‘PushFilterIntoTableSourceScanRule’, but where is it?
I think it need to be fixed for array types.


was (Author: JIRAUSER298666):
[~jark]  Thank you for your reply!So should I submit a PR about this in 
github?Or the better way to fix this?

When I fix the issue through the method I introduced above,and url is defined 
as an array,I can obtain predicates successfully.
But I found another issue in my debugging process at the same time,when url is 
defined as string not an array,sql like this returned no results:
{code:java}
String s1_and = "select * from image_source " +
"where url = 'aaa.jpg'" +
"and url = 'bbb.jpg'";
{code}
This is because ‘url‘ cannot be assigned two different values simultaneously. I 
think this is logical. But when I defined it as an array type, sql like this 
returned two records unexpectedly! 

{code:java}
String s3_and = "select * from image_source where " +
"url = ARRAY['aaa.jpg', 'bbb.jpg'] and url = ARRAY['ccc.jpg', 
'ddd.jpg']";
{code}
 I debugged and found that the task has entered the 
*PushFilterIntoTableSourceScanRule *of org. apache. Flink. table. planner. 
plan. rules. logical, but when url is a string,the task of “url = xxx and url = 
yyy” didn't enter 'onMatch' method of *PushFilterIntoTableSourceScanRule*. 

 !screenshot-8.png! 
{code:java}
// filter:
rel#100:LogicalFilter.NONE.any.None: 
0.[NONE].[NONE](input=HepRelVertex#99,condition=
AND(=($0, CAST(ARRAY(_UTF-16LE'aaa.jpg', 
_UTF-16LE'bbb.jpg')):VARCHAR(2147483647) CHARACTER SET "UTF-16LE" ARRAY NOT 
NULL), 
=($0, CAST(ARRAY(_UTF-16LE'ccc.jpg', _UTF-16LE'ddd.jpg')):VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" ARRAY NOT NULL)))
{code}

I am confused how the frame work to verify that a field cannot be equal to two 
values. In which module is this processed?I think it need to be fixed.

> Does the custom connector not support pushing down "where" query predicates 
> to query fields of array type?
> --
>
> Key: FLINK-32188
> URL: https://issues.apache.org/jira/browse/FLINK-32188
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.2, 1.17.0, 1.16.1
>Reporter: Xin Chen
>Priority: Major
> Attachments: image-2023-05-25-17-16-02-288.png, 
> image-2023-05-25-20-44-08-834.png, image-2023-05-25-20-44-47-581.png, 
> screenshot-1.png, screenshot-2.png, screenshot-3.png, screenshot-4.png, 
> screenshot-5.png, screenshot-6.png, screenshot-7.png, screenshot-8.png
>
>
> When I customized a data source connector which assumed as image-connector, I 
> found that when creating a table with ddl, I specified a field URL as an 
> array type. When submitting an SQL task with Flink, I specified query this 
> field as a fixed array. For example, select * from image source where 

[jira] [Comment Edited] (FLINK-32188) Does the custom connector not support pushing down "where" query predicates to query fields of array type?

2023-05-26 Thread Xin Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726609#comment-17726609
 ] 

Xin Chen edited comment on FLINK-32188 at 5/26/23 12:26 PM:


[~jark]  Thank you for your reply!So should I submit a PR about this in 
github?Or the better way to fix this?

When I fix the issue through the method I introduced above,and url is defined 
as an array,I can obtain predicates successfully.
But I found another issue in my debugging process at the same time,when url is 
defined as string not an array,sql like this returned no results:
{code:java}
String s1_and = "select * from image_source " +
"where url = 'aaa.jpg'" +
"and url = 'bbb.jpg'";
{code}
This is because ‘url‘ cannot be assigned two different values simultaneously. I 
think this is logical. But when I defined it as an array type, sql like this 
returned two records unexpectedly! 

{code:java}
String s3_and = "select * from image_source where " +
"url = ARRAY['aaa.jpg', 'bbb.jpg'] and url = ARRAY['ccc.jpg', 
'ddd.jpg']";
{code}
 I debugged and found that the task has entered the 
*PushFilterIntoTableSourceScanRule *of org. apache. Flink. table. planner. 
plan. rules. logical, but when url is a string,the task of “url = xxx and url = 
yyy” didn't enter 'onMatch' method of *PushFilterIntoTableSourceScanRule*. 

 !screenshot-8.png! 
{code:java}
// filter:
rel#100:LogicalFilter.NONE.any.None: 
0.[NONE].[NONE](input=HepRelVertex#99,condition=
AND(=($0, CAST(ARRAY(_UTF-16LE'aaa.jpg', 
_UTF-16LE'bbb.jpg')):VARCHAR(2147483647) CHARACTER SET "UTF-16LE" ARRAY NOT 
NULL), 
=($0, CAST(ARRAY(_UTF-16LE'ccc.jpg', _UTF-16LE'ddd.jpg')):VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" ARRAY NOT NULL)))
{code}

I am confused how the frame work to verify that a field cannot be equal to two 
values. In which module is this processed?I think it need to be fixed.


was (Author: JIRAUSER298666):
[~jark]  Thank you for your reply!So should I submit a PR about this in 
github?Or the better way to fix this?

When I fix the issue through the method I introduced above,and url is defined 
as an array,I can obtain predicates successfully.
But I found another issue in my debugging process at the same time,when url is 
defined as string not an array,sql like this returned no results:
{code:java}
String s1_and = "select * from image_source " +
"where url = 'aaa.jpg'" +
"and url = 'bbb.jpg'";
{code}
This is because ‘url‘ cannot be assigned two different values simultaneously. I 
think this is logical. But when I defined it as an array type, sql like this 
returned two records unexpectedly! 

{code:java}
String s3_and = "select * from image_source where " +
"url = ARRAY['aaa.jpg', 'bbb.jpg'] and url = ARRAY['ccc.jpg', 
'ddd.jpg']";
{code}
 I debugged and found that the task has entered the 
*PushFilterIntoTableSourceScanRule *of org. apache. Flink. table. planner. 
plan. rules. logical, but when url is a string,the task of “url = xxx and url = 
yyy” didn't enter 'onMatch' method of *PushFilterIntoTableSourceScanRule*. 

 !screenshot-8.png! 
{code:java}
// filter:
rel#100:LogicalFilter.NONE.any.None: 
0.[NONE].[NONE](input=HepRelVertex#99,condition=
AND(=($0, CAST(ARRAY(_UTF-16LE'aaa.jpg', 
_UTF-16LE'bbb.jpg')):VARCHAR(2147483647) CHARACTER SET "UTF-16LE" ARRAY NOT 
NULL), 
=($0, CAST(ARRAY(_UTF-16LE'ccc.jpg', _UTF-16LE'ddd.jpg')):VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" ARRAY NOT NULL)))
{code}

I am confused how the frame work to verify that a field cannot be equal to two 
values at the same time. In which module is this processed?I think it need to 
be fixed.

> Does the custom connector not support pushing down "where" query predicates 
> to query fields of array type?
> --
>
> Key: FLINK-32188
> URL: https://issues.apache.org/jira/browse/FLINK-32188
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.2, 1.17.0, 1.16.1
>Reporter: Xin Chen
>Priority: Major
> Attachments: image-2023-05-25-17-16-02-288.png, 
> image-2023-05-25-20-44-08-834.png, image-2023-05-25-20-44-47-581.png, 
> screenshot-1.png, screenshot-2.png, screenshot-3.png, screenshot-4.png, 
> screenshot-5.png, screenshot-6.png, screenshot-7.png, screenshot-8.png
>
>
> When I customized a data source connector which assumed as image-connector, I 
> found that when creating a table with ddl, I specified a field URL as an 
> array type. When submitting an SQL task with Flink, I specified query this 
> field as a fixed array. For example, select * from image source where 
> URL=ARRAY ['/flink. jpg', '/flink_1. jpg'], but it couldn't obtain the 
> corresponding 

[jira] [Comment Edited] (FLINK-32188) Does the custom connector not support pushing down "where" query predicates to query fields of array type?

2023-05-26 Thread Xin Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726609#comment-17726609
 ] 

Xin Chen edited comment on FLINK-32188 at 5/26/23 12:24 PM:


[~jark]  Thank you for your reply!So should I submit a PR about this in 
github?Or the better way to fix this?

When I fix the issue through the method I introduced above,and url is defined 
as an array,I can obtain predicates successfully.
But I found another issue in my debugging process at the same time,when url is 
defined as string not an array,sql like this returned no results:
{code:java}
String s1_and = "select * from image_source " +
"where url = 'aaa.jpg'" +
"and url = 'bbb.jpg'";
{code}
This is because ‘url‘ cannot be assigned two different values simultaneously. I 
think this is logical. But when I defined it as an array type, sql like this 
returned two records unexpectedly! 

{code:java}
String s3_and = "select * from image_source where " +
"url = ARRAY['aaa.jpg', 'bbb.jpg'] and url = ARRAY['ccc.jpg', 
'ddd.jpg']";
{code}
 I debugged and found that the task has entered the 
*PushFilterIntoTableSourceScanRule *of org. apache. Flink. table. planner. 
plan. rules. logical, but when url is a string,the task of “url = xxx and url = 
yyy” didn't enter 'onMatch' method of *PushFilterIntoTableSourceScanRule*. 

 !screenshot-8.png! 
{code:java}
// filter:
rel#100:LogicalFilter.NONE.any.None: 
0.[NONE].[NONE](input=HepRelVertex#99,condition=
AND(=($0, CAST(ARRAY(_UTF-16LE'aaa.jpg', 
_UTF-16LE'bbb.jpg')):VARCHAR(2147483647) CHARACTER SET "UTF-16LE" ARRAY NOT 
NULL), 
=($0, CAST(ARRAY(_UTF-16LE'ccc.jpg', _UTF-16LE'ddd.jpg')):VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" ARRAY NOT NULL)))
{code}

I am confused how the frame work to verify that a field cannot be equal to two 
values at the same time. In which module is this processed?I think it need to 
be fixed.


was (Author: JIRAUSER298666):
[~jark]  Thank you for your reply!So should I submit a PR about this in 
github?Or the better way to fix this?

When I fix the issue through the method I introduced above,and url is defined 
as an array,I can obtain predicates successfully.
But I found another issue in my debugging process at the same time,when url is 
defined as string not an array,sql like this returned no results:
{code:java}
String s1_and = "select * from image_source " +
"where url = 'aaa.jpg'" +
"and url = 'bbb.jpg'";
{code}
This is because ‘url‘ cannot be assigned two different values simultaneously. I 
think this is logical. But when I defined it as an array type, sql like this 
returned two records unexpectedly! 

{code:java}
String s3_and = "select image, errorCode from image_source where " +
"url = ARRAY['aaa.jpg', 'bbb.jpg'] and url = ARRAY['ccc.jpg', 
'ddd.jpg']";
{code}
 I debugged and found that the task has entered the 
*PushFilterIntoTableSourceScanRule *of org. apache. Flink. table. planner. 
plan. rules. logical, but when url is a string,the task of “url = xxx and url = 
yyy” didn't enter 'onMatch' method of *PushFilterIntoTableSourceScanRule*. 

 !screenshot-8.png! 
{code:java}
// filter:
rel#100:LogicalFilter.NONE.any.None: 
0.[NONE].[NONE](input=HepRelVertex#99,condition=
AND(=($0, CAST(ARRAY(_UTF-16LE'aaa.jpg', 
_UTF-16LE'bbb.jpg')):VARCHAR(2147483647) CHARACTER SET "UTF-16LE" ARRAY NOT 
NULL), 
=($0, CAST(ARRAY(_UTF-16LE'ccc.jpg', _UTF-16LE'ddd.jpg')):VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" ARRAY NOT NULL)))
{code}

I am confused how the frame work to verify that a field cannot be equal to two 
values at the same time. In which module is this processed?I think it need to 
be fixed.

> Does the custom connector not support pushing down "where" query predicates 
> to query fields of array type?
> --
>
> Key: FLINK-32188
> URL: https://issues.apache.org/jira/browse/FLINK-32188
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.2, 1.17.0, 1.16.1
>Reporter: Xin Chen
>Priority: Major
> Attachments: image-2023-05-25-17-16-02-288.png, 
> image-2023-05-25-20-44-08-834.png, image-2023-05-25-20-44-47-581.png, 
> screenshot-1.png, screenshot-2.png, screenshot-3.png, screenshot-4.png, 
> screenshot-5.png, screenshot-6.png, screenshot-7.png, screenshot-8.png
>
>
> When I customized a data source connector which assumed as image-connector, I 
> found that when creating a table with ddl, I specified a field URL as an 
> array type. When submitting an SQL task with Flink, I specified query this 
> field as a fixed array. For example, select * from image source where 
> URL=ARRAY ['/flink. jpg', '/flink_1. jpg'], but it 

[jira] [Comment Edited] (FLINK-32188) Does the custom connector not support pushing down "where" query predicates to query fields of array type?

2023-05-26 Thread Xin Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726609#comment-17726609
 ] 

Xin Chen edited comment on FLINK-32188 at 5/26/23 12:23 PM:


[~jark]  Thank you for your reply!So should I submit a PR about this in 
github?Or the better way to fix this?

When I fix the issue through the method I introduced above,and url is defined 
as an array,I can obtain predicates successfully.
But I found another issue in my debugging process at the same time,when url is 
defined as string not an array,sql like this returned no results:
{code:java}
String s1_and = "select * from image_source " +
"where url = 'aaa.jpg'" +
"and url = 'bbb.jpg'";
{code}
This is because ‘url‘ cannot be assigned two different values simultaneously. I 
think this is logical. But when I defined it as an array type, sql like this 
returned two records unexpectedly! 

{code:java}
String s3_and = "select image, errorCode from image_source where " +
"url = ARRAY['aaa.jpg', 'bbb.jpg'] and url = ARRAY['ccc.jpg', 
'ddd.jpg']";
{code}
 I debugged and found that the task has entered the 
*PushFilterIntoTableSourceScanRule *of org. apache. Flink. table. planner. 
plan. rules. logical, but when url is a string,the task of “url = xxx and url = 
yyy” didn't enter 'onMatch' method of *PushFilterIntoTableSourceScanRule*. 

 !screenshot-8.png! 
{code:java}
// filter:
rel#100:LogicalFilter.NONE.any.None: 
0.[NONE].[NONE](input=HepRelVertex#99,condition=
AND(=($0, CAST(ARRAY(_UTF-16LE'aaa.jpg', 
_UTF-16LE'bbb.jpg')):VARCHAR(2147483647) CHARACTER SET "UTF-16LE" ARRAY NOT 
NULL), 
=($0, CAST(ARRAY(_UTF-16LE'ccc.jpg', _UTF-16LE'ddd.jpg')):VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" ARRAY NOT NULL)))
{code}

I am confused how the frame work to verify that a field cannot be equal to two 
values at the same time. In which module is this processed?I think it need to 
be fixed.


was (Author: JIRAUSER298666):
[~jark]Jark Wu Thank you for your reply!So should I submit a PR about this in 
github?Or the better way to fix this?

When I fix the issue through the method I introduced above,and url is defined 
as an array,I can obtain predicates successfully.
But I found another issue in my debugging process at the same time,when url is 
defined as string not an array,sql like this returned no results:
{code:java}
String s1_and = "select * from image_source " +
"where url = 'aaa.jpg'" +
"and url = 'bbb.jpg'";
{code}
This is because ‘url‘ cannot be assigned two different values simultaneously. I 
think this is logical. But when I defined it as an array type, sql like this 
returned two records unexpectedly! 

{code:java}
String s3_and = "select image, errorCode from image_source where " +
"url = ARRAY['aaa.jpg', 'bbb.jpg'] and url = ARRAY['ccc.jpg', 
'ddd.jpg']";
{code}
 I debugged and found that the task has entered the 
*PushFilterIntoTableSourceScanRule *of org. apache. Flink. table. planner. 
plan. rules. logical, but when url is a string,the task of “url = xxx and url = 
yyy” didn't enter 'onMatch' method of *PushFilterIntoTableSourceScanRule*. 

 !screenshot-8.png! 
{code:java}
// filter:
rel#100:LogicalFilter.NONE.any.None: 
0.[NONE].[NONE](input=HepRelVertex#99,condition=
AND(=($0, CAST(ARRAY(_UTF-16LE'aaa.jpg', 
_UTF-16LE'bbb.jpg')):VARCHAR(2147483647) CHARACTER SET "UTF-16LE" ARRAY NOT 
NULL), 
=($0, CAST(ARRAY(_UTF-16LE'ccc.jpg', _UTF-16LE'ddd.jpg')):VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" ARRAY NOT NULL)))
{code}

I am confused how the frame work to verify that a field cannot be equal to two 
values at the same time. In which module is this processed?I think it need to 
be fixed.

> Does the custom connector not support pushing down "where" query predicates 
> to query fields of array type?
> --
>
> Key: FLINK-32188
> URL: https://issues.apache.org/jira/browse/FLINK-32188
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.2, 1.17.0, 1.16.1
>Reporter: Xin Chen
>Priority: Major
> Attachments: image-2023-05-25-17-16-02-288.png, 
> image-2023-05-25-20-44-08-834.png, image-2023-05-25-20-44-47-581.png, 
> screenshot-1.png, screenshot-2.png, screenshot-3.png, screenshot-4.png, 
> screenshot-5.png, screenshot-6.png, screenshot-7.png, screenshot-8.png
>
>
> When I customized a data source connector which assumed as image-connector, I 
> found that when creating a table with ddl, I specified a field URL as an 
> array type. When submitting an SQL task with Flink, I specified query this 
> field as a fixed array. For example, select * from image source where 
> URL=ARRAY ['/flink. jpg', '/flink_1. 

[jira] [Commented] (FLINK-32188) Does the custom connector not support pushing down "where" query predicates to query fields of array type?

2023-05-26 Thread Xin Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726609#comment-17726609
 ] 

Xin Chen commented on FLINK-32188:
--

Jark Wu Thank you for your reply!So should I submit a PR about this in 
github?Or the better way to fix this?

When I fix the issue through the method I introduced above,and url is defined 
as an array,I can obtain predicates successfully.
But I found another issue in my debugging process at the same time,when url is 
defined as string not an array,sql like this returned no results:
{code:java}
String s1_and = "select * from image_source " +
"where url = 'aaa.jpg'" +
"and url = 'bbb.jpg'";
{code}
This is because ‘url‘ cannot be assigned two different values simultaneously. I 
think this is logical. But when I defined it as an array type, sql like this 
returned two records unexpectedly! 

{code:java}
String s3_and = "select image, errorCode from image_source where " +
"url = ARRAY['aaa.jpg', 'bbb.jpg'] and url = ARRAY['ccc.jpg', 
'ddd.jpg']";
{code}
 I debugged and found that the task has entered the 
*PushFilterIntoTableSourceScanRule *of org. apache. Flink. table. planner. 
plan. rules. logical, but when url is a string,the task of “url = xxx and url = 
yyy” didn't enter 'onMatch' method of *PushFilterIntoTableSourceScanRule*. 

 !screenshot-8.png! 
{code:java}
// filter:
rel#100:LogicalFilter.NONE.any.None: 
0.[NONE].[NONE](input=HepRelVertex#99,condition=
AND(=($0, CAST(ARRAY(_UTF-16LE'aaa.jpg', 
_UTF-16LE'bbb.jpg')):VARCHAR(2147483647) CHARACTER SET "UTF-16LE" ARRAY NOT 
NULL), 
=($0, CAST(ARRAY(_UTF-16LE'ccc.jpg', _UTF-16LE'ddd.jpg')):VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" ARRAY NOT NULL)))
{code}

I am confused how the frame work to verify that a field cannot be equal to two 
values at the same time. In which module is this processed?I think it need to 
be fixed.

> Does the custom connector not support pushing down "where" query predicates 
> to query fields of array type?
> --
>
> Key: FLINK-32188
> URL: https://issues.apache.org/jira/browse/FLINK-32188
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.2, 1.17.0, 1.16.1
>Reporter: Xin Chen
>Priority: Major
> Attachments: image-2023-05-25-17-16-02-288.png, 
> image-2023-05-25-20-44-08-834.png, image-2023-05-25-20-44-47-581.png, 
> screenshot-1.png, screenshot-2.png, screenshot-3.png, screenshot-4.png, 
> screenshot-5.png, screenshot-6.png, screenshot-7.png, screenshot-8.png
>
>
> When I customized a data source connector which assumed as image-connector, I 
> found that when creating a table with ddl, I specified a field URL as an 
> array type. When submitting an SQL task with Flink, I specified query this 
> field as a fixed array. For example, select * from image source where 
> URL=ARRAY ['/flink. jpg', '/flink_1. jpg'], but it couldn't obtain the 
> corresponding predicate filters at all.
> Does the custom connector not support  to query fields of array type with 
> "where"?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-32188) Does the custom connector not support pushing down "where" query predicates to query fields of array type?

2023-05-26 Thread Xin Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726609#comment-17726609
 ] 

Xin Chen edited comment on FLINK-32188 at 5/26/23 12:22 PM:


[~jark]Jark Wu Thank you for your reply!So should I submit a PR about this in 
github?Or the better way to fix this?

When I fix the issue through the method I introduced above,and url is defined 
as an array,I can obtain predicates successfully.
But I found another issue in my debugging process at the same time,when url is 
defined as string not an array,sql like this returned no results:
{code:java}
String s1_and = "select * from image_source " +
"where url = 'aaa.jpg'" +
"and url = 'bbb.jpg'";
{code}
This is because ‘url‘ cannot be assigned two different values simultaneously. I 
think this is logical. But when I defined it as an array type, sql like this 
returned two records unexpectedly! 

{code:java}
String s3_and = "select image, errorCode from image_source where " +
"url = ARRAY['aaa.jpg', 'bbb.jpg'] and url = ARRAY['ccc.jpg', 
'ddd.jpg']";
{code}
 I debugged and found that the task has entered the 
*PushFilterIntoTableSourceScanRule *of org. apache. Flink. table. planner. 
plan. rules. logical, but when url is a string,the task of “url = xxx and url = 
yyy” didn't enter 'onMatch' method of *PushFilterIntoTableSourceScanRule*. 

 !screenshot-8.png! 
{code:java}
// filter:
rel#100:LogicalFilter.NONE.any.None: 
0.[NONE].[NONE](input=HepRelVertex#99,condition=
AND(=($0, CAST(ARRAY(_UTF-16LE'aaa.jpg', 
_UTF-16LE'bbb.jpg')):VARCHAR(2147483647) CHARACTER SET "UTF-16LE" ARRAY NOT 
NULL), 
=($0, CAST(ARRAY(_UTF-16LE'ccc.jpg', _UTF-16LE'ddd.jpg')):VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" ARRAY NOT NULL)))
{code}

I am confused how the frame work to verify that a field cannot be equal to two 
values at the same time. In which module is this processed?I think it need to 
be fixed.


was (Author: JIRAUSER298666):
Jark Wu Thank you for your reply!So should I submit a PR about this in 
github?Or the better way to fix this?

When I fix the issue through the method I introduced above,and url is defined 
as an array,I can obtain predicates successfully.
But I found another issue in my debugging process at the same time,when url is 
defined as string not an array,sql like this returned no results:
{code:java}
String s1_and = "select * from image_source " +
"where url = 'aaa.jpg'" +
"and url = 'bbb.jpg'";
{code}
This is because ‘url‘ cannot be assigned two different values simultaneously. I 
think this is logical. But when I defined it as an array type, sql like this 
returned two records unexpectedly! 

{code:java}
String s3_and = "select image, errorCode from image_source where " +
"url = ARRAY['aaa.jpg', 'bbb.jpg'] and url = ARRAY['ccc.jpg', 
'ddd.jpg']";
{code}
 I debugged and found that the task has entered the 
*PushFilterIntoTableSourceScanRule *of org. apache. Flink. table. planner. 
plan. rules. logical, but when url is a string,the task of “url = xxx and url = 
yyy” didn't enter 'onMatch' method of *PushFilterIntoTableSourceScanRule*. 

 !screenshot-8.png! 
{code:java}
// filter:
rel#100:LogicalFilter.NONE.any.None: 
0.[NONE].[NONE](input=HepRelVertex#99,condition=
AND(=($0, CAST(ARRAY(_UTF-16LE'aaa.jpg', 
_UTF-16LE'bbb.jpg')):VARCHAR(2147483647) CHARACTER SET "UTF-16LE" ARRAY NOT 
NULL), 
=($0, CAST(ARRAY(_UTF-16LE'ccc.jpg', _UTF-16LE'ddd.jpg')):VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" ARRAY NOT NULL)))
{code}

I am confused how the frame work to verify that a field cannot be equal to two 
values at the same time. In which module is this processed?I think it need to 
be fixed.

> Does the custom connector not support pushing down "where" query predicates 
> to query fields of array type?
> --
>
> Key: FLINK-32188
> URL: https://issues.apache.org/jira/browse/FLINK-32188
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.2, 1.17.0, 1.16.1
>Reporter: Xin Chen
>Priority: Major
> Attachments: image-2023-05-25-17-16-02-288.png, 
> image-2023-05-25-20-44-08-834.png, image-2023-05-25-20-44-47-581.png, 
> screenshot-1.png, screenshot-2.png, screenshot-3.png, screenshot-4.png, 
> screenshot-5.png, screenshot-6.png, screenshot-7.png, screenshot-8.png
>
>
> When I customized a data source connector which assumed as image-connector, I 
> found that when creating a table with ddl, I specified a field URL as an 
> array type. When submitting an SQL task with Flink, I specified query this 
> field as a fixed array. For example, select * from image source where 
> URL=ARRAY ['/flink. jpg', '/flink_1. 

[jira] [Updated] (FLINK-32188) Does the custom connector not support pushing down "where" query predicates to query fields of array type?

2023-05-26 Thread Xin Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32188?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xin Chen updated FLINK-32188:
-
Attachment: screenshot-8.png

> Does the custom connector not support pushing down "where" query predicates 
> to query fields of array type?
> --
>
> Key: FLINK-32188
> URL: https://issues.apache.org/jira/browse/FLINK-32188
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.2, 1.17.0, 1.16.1
>Reporter: Xin Chen
>Priority: Major
> Attachments: image-2023-05-25-17-16-02-288.png, 
> image-2023-05-25-20-44-08-834.png, image-2023-05-25-20-44-47-581.png, 
> screenshot-1.png, screenshot-2.png, screenshot-3.png, screenshot-4.png, 
> screenshot-5.png, screenshot-6.png, screenshot-7.png, screenshot-8.png
>
>
> When I customized a data source connector which assumed as image-connector, I 
> found that when creating a table with ddl, I specified a field URL as an 
> array type. When submitting an SQL task with Flink, I specified query this 
> field as a fixed array. For example, select * from image source where 
> URL=ARRAY ['/flink. jpg', '/flink_1. jpg'], but it couldn't obtain the 
> corresponding predicate filters at all.
> Does the custom connector not support  to query fields of array type with 
> "where"?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-aws] hlteoh37 commented on a diff in pull request #49: [FLINK-24438] Add Kinesis connector using FLIP-27

2023-05-26 Thread via GitHub


hlteoh37 commented on code in PR #49:
URL: 
https://github.com/apache/flink-connector-aws/pull/49#discussion_r1206683144


##
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReader.java:
##
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source.reader;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import org.apache.flink.connector.kinesis.source.proxy.StreamProxy;
+import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
+import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitState;
+import org.apache.flink.connector.kinesis.source.split.StartingPosition;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet;
+
+import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
+import software.amazon.awssdk.services.kinesis.model.Record;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.Set;
+
+/**
+ * An implementation of the SplitReader that periodically polls the Kinesis 
stream to retrieve
+ * records.
+ */
+@Internal
+public class PollingKinesisShardSplitReader implements SplitReader {
+
+private static final RecordsWithSplitIds 
INCOMPLETE_SHARD_EMPTY_RECORDS =
+new KinesisRecordsWithSplitIds(Collections.emptyIterator(), null, 
false);
+
+private final StreamProxy kinesis;
+private final Deque assignedSplits = new 
ArrayDeque<>();
+
+public PollingKinesisShardSplitReader(StreamProxy kinesisProxy) {
+this.kinesis = kinesisProxy;
+}
+
+@Override
+public RecordsWithSplitIds fetch() throws IOException {
+KinesisShardSplitState splitState = assignedSplits.poll();
+if (splitState == null) {
+return INCOMPLETE_SHARD_EMPTY_RECORDS;
+}
+
+GetRecordsResponse getRecordsResponse =
+kinesis.getRecords(
+splitState.getStreamArn(),
+splitState.getShardId(),
+splitState.getNextStartingPosition());
+boolean isComplete = getRecordsResponse.nextShardIterator() == null;
+
+if (hasNoRecords(getRecordsResponse)) {
+if (isComplete) {
+return new KinesisRecordsWithSplitIds(
+Collections.emptyIterator(), splitState.splitId(), 
true);
+} else {
+assignedSplits.add(splitState);
+return INCOMPLETE_SHARD_EMPTY_RECORDS;
+}
+}
+
+splitState.setNextStartingPosition(
+StartingPosition.continueFromSequenceNumber(
+getRecordsResponse
+.records()
+.get(getRecordsResponse.records().size() - 1)
+.sequenceNumber()));
+
+assignedSplits.add(splitState);
+return new KinesisRecordsWithSplitIds(
+getRecordsResponse.records().iterator(), splitState.splitId(), 
isComplete);
+}
+
+private boolean hasNoRecords(GetRecordsResponse getRecordsResponse) {
+return !getRecordsResponse.hasRecords() || 
getRecordsResponse.records().isEmpty();
+}
+
+@Override
+public void handleSplitsChanges(SplitsChange 
splitsChanges) {
+for (KinesisShardSplit split : splitsChanges.splits()) {
+assignedSplits.add(new KinesisShardSplitState(split));
+}
+}
+
+@Override
+public void wakeUp() {
+// Do nothing because we don't have any sleep mechanism
+}
+
+@Override
+public void close() throws Exception {}

Review Comment:
   Good catch. Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to 

[GitHub] [flink-connector-aws] hlteoh37 commented on a diff in pull request #49: [FLINK-24438] Add Kinesis connector using FLIP-27

2023-05-26 Thread via GitHub


hlteoh37 commented on code in PR #49:
URL: 
https://github.com/apache/flink-connector-aws/pull/49#discussion_r1206671480


##
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumerator.java:
##
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source.enumerator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import 
org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.InitialPosition;
+import 
org.apache.flink.connector.kinesis.source.enumerator.assigner.ShardAssignerFactory;
+import 
org.apache.flink.connector.kinesis.source.exception.KinesisStreamsSourceException;
+import org.apache.flink.connector.kinesis.source.model.CompletedShardsEvent;
+import org.apache.flink.connector.kinesis.source.proxy.StreamProxy;
+import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
+import org.apache.flink.connector.kinesis.source.split.StartingPosition;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.model.Shard;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import static 
org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS;
+import static 
org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.DEFAULT_STREAM_INITIAL_POSITION;
+import static 
org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS;
+import static 
org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.STREAM_INITIAL_POSITION;
+import static 
org.apache.flink.connector.kinesis.source.config.SourceConfigUtil.parseStreamTimestampStartingPosition;
+
+/**
+ * This class is used to discover and assign Kinesis splits to subtasks on the 
Flink cluster. This
+ * runs on the JobManager.
+ */
+@Internal
+public class KinesisStreamsSourceEnumerator
+implements SplitEnumerator {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(KinesisStreamsSourceEnumerator.class);
+
+private final SplitEnumeratorContext context;
+private final String streamArn;
+private final Properties consumerConfig;
+private final StreamProxy streamProxy;
+private final KinesisShardAssigner shardAssigner;
+private final KinesisShardAssigner.Context shardAssignerContext;
+
+private final Map> splitAssignment = new 
HashMap<>();
+private final Set assignedSplitIds = new HashSet<>();
+private final Set unassignedSplits;
+private final Set completedSplitIds;
+
+private String lastSeenShardId;
+
+public KinesisStreamsSourceEnumerator(
+SplitEnumeratorContext context,
+String streamArn,
+Properties consumerConfig,
+StreamProxy streamProxy,
+KinesisStreamsSourceEnumeratorState state) {
+this.context = context;
+this.streamArn = streamArn;
+this.consumerConfig = consumerConfig;
+this.streamProxy = streamProxy;
+this.shardAssigner = ShardAssignerFactory.uniformShardAssigner();
+this.shardAssignerContext = new ShardAssignerContext(splitAssignment, 
context);
+if (state == null) {
+this.completedSplitIds = new HashSet<>();
+this.lastSeenShardId = null;
+this.unassignedSplits = new HashSet<>();
+} else {
+this.completedSplitIds = state.getCompletedSplitIds();
+this.lastSeenShardId = state.getLastSeenShardId();
+

[GitHub] [flink-connector-aws] hlteoh37 commented on a diff in pull request #49: [FLINK-24438] Add Kinesis connector using FLIP-27

2023-05-26 Thread via GitHub


hlteoh37 commented on code in PR #49:
URL: 
https://github.com/apache/flink-connector-aws/pull/49#discussion_r1206669438


##
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/serialization/KinesisDeserializationSchemaWrapper.java:
##
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source.serialization;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.Collector;
+
+import software.amazon.awssdk.services.kinesis.model.Record;
+
+import java.io.IOException;
+
+/**
+ * A simple wrapper for using the {@link DeserializationSchema} with the {@link
+ * KinesisDeserializationSchema} interface.
+ *
+ * @param  The type created by the deserialization schema.
+ */
+@Internal
+class KinesisDeserializationSchemaWrapper implements 
KinesisDeserializationSchema {
+private static final long serialVersionUID = 9143148962928375886L;
+
+private final DeserializationSchema deserializationSchema;
+
+KinesisDeserializationSchemaWrapper(DeserializationSchema 
deserializationSchema) {
+this.deserializationSchema = deserializationSchema;
+}
+
+@Override
+public void open(DeserializationSchema.InitializationContext context) 
throws Exception {
+this.deserializationSchema.open(context);
+}
+
+@Override
+public void deserialize(Record record, String stream, String shardId, 
Collector output)

Review Comment:
   They are part of the `Record` datatype



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-aws] hlteoh37 commented on a diff in pull request #49: [FLINK-24438] Add Kinesis connector using FLIP-27

2023-05-26 Thread via GitHub


hlteoh37 commented on code in PR #49:
URL: 
https://github.com/apache/flink-connector-aws/pull/49#discussion_r1206669002


##
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/assigner/UniformShardAssigner.java:
##
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source.enumerator.assigner;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.connector.kinesis.source.enumerator.KinesisShardAssigner;
+import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/** An implementation of the {@link KinesisShardAssigner} that assigns splits 
uniformly. */
+@Internal
+public class UniformShardAssigner implements KinesisShardAssigner {
+@Override
+public int assign(KinesisShardSplit split, Context context) {
+int selectedSubtask = -1;
+int curMinAssignment = Integer.MAX_VALUE;
+Map> splitAssignment = 
context.getCurrentSplitAssignment();
+Map> pendingSplitAssignments =
+context.getPendingSplitAssignments();
+
+for (int subtaskId : context.getRegisteredReaders().keySet()) {
+int subtaskAssignmentSize =
+splitAssignment.getOrDefault(subtaskId, 
Collections.emptySet()).size()
++ pendingSplitAssignments
+.getOrDefault(subtaskId, 
Collections.emptyList())
+.size();
+if (subtaskAssignmentSize < curMinAssignment) {
+curMinAssignment = subtaskAssignmentSize;
+selectedSubtask = subtaskId;
+}
+}

Review Comment:
   Ok added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-aws] hlteoh37 commented on a diff in pull request #49: [FLINK-24438] Add Kinesis connector using FLIP-27

2023-05-26 Thread via GitHub


hlteoh37 commented on code in PR #49:
URL: 
https://github.com/apache/flink-connector-aws/pull/49#discussion_r120406


##
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumerator.java:
##
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source.enumerator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import 
org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.InitialPosition;
+import 
org.apache.flink.connector.kinesis.source.exception.KinesisStreamsSourceException;
+import org.apache.flink.connector.kinesis.source.proxy.StreamProxy;
+import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
+import org.apache.flink.connector.kinesis.source.split.StartingPosition;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap;
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.model.Shard;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+
+import static 
org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS;
+import static 
org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.DEFAULT_STREAM_INITIAL_POSITION;
+import static 
org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS;
+import static 
org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.STREAM_INITIAL_POSITION;
+import static 
org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigUtil.parseStreamTimestampStartingPosition;
+
+/**
+ * This class is used to discover and assign Kinesis splits to subtasks on the 
Flink cluster. This
+ * runs on the JobManager.
+ */
+@Internal
+public class KinesisStreamsSourceEnumerator
+implements SplitEnumerator {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(KinesisStreamsSourceEnumerator.class);
+
+private final SplitEnumeratorContext context;
+private final String streamArn;
+private final Properties consumerConfig;
+private final StreamProxy streamProxy;
+private final KinesisShardAssigner shardAssigner;
+private final ShardAssignerContext shardAssignerContext;
+
+private final Map> splitAssignment = new 
HashMap<>();
+private final Set assignedSplitIds = new HashSet<>();
+private final Set unassignedSplits;
+
+private String lastSeenShardId;
+
+public KinesisStreamsSourceEnumerator(
+SplitEnumeratorContext context,
+String streamArn,
+Properties consumerConfig,
+StreamProxy streamProxy,
+KinesisShardAssigner shardAssigner,
+KinesisStreamsSourceEnumeratorState state) {
+this.context = context;
+this.streamArn = streamArn;
+this.consumerConfig = consumerConfig;
+this.streamProxy = streamProxy;
+this.shardAssigner = shardAssigner;
+this.shardAssignerContext = new ShardAssignerContext(splitAssignment, 
context);
+if (state == null) {
+this.lastSeenShardId = null;
+this.unassignedSplits = new HashSet<>();
+} else {
+this.lastSeenShardId = state.getLastSeenShardId();
+this.unassignedSplits = 

[GitHub] [flink-connector-aws] hlteoh37 commented on a diff in pull request #49: [FLINK-24438] Add Kinesis connector using FLIP-27

2023-05-26 Thread via GitHub


hlteoh37 commented on code in PR #49:
URL: 
https://github.com/apache/flink-connector-aws/pull/49#discussion_r1206663759


##
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumerator.java:
##
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source.enumerator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import 
org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.InitialPosition;
+import 
org.apache.flink.connector.kinesis.source.exception.KinesisStreamsSourceException;
+import org.apache.flink.connector.kinesis.source.proxy.StreamProxy;
+import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
+import org.apache.flink.connector.kinesis.source.split.StartingPosition;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap;
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.model.Shard;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+
+import static 
org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS;
+import static 
org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.DEFAULT_STREAM_INITIAL_POSITION;
+import static 
org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS;
+import static 
org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.STREAM_INITIAL_POSITION;
+import static 
org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigUtil.parseStreamTimestampStartingPosition;
+
+/**
+ * This class is used to discover and assign Kinesis splits to subtasks on the 
Flink cluster. This
+ * runs on the JobManager.
+ */
+@Internal
+public class KinesisStreamsSourceEnumerator
+implements SplitEnumerator {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(KinesisStreamsSourceEnumerator.class);
+
+private final SplitEnumeratorContext context;
+private final String streamArn;
+private final Properties consumerConfig;
+private final StreamProxy streamProxy;
+private final KinesisShardAssigner shardAssigner;
+private final ShardAssignerContext shardAssignerContext;
+
+private final Map> splitAssignment = new 
HashMap<>();
+private final Set assignedSplitIds = new HashSet<>();
+private final Set unassignedSplits;
+
+private String lastSeenShardId;
+
+public KinesisStreamsSourceEnumerator(
+SplitEnumeratorContext context,
+String streamArn,
+Properties consumerConfig,
+StreamProxy streamProxy,
+KinesisShardAssigner shardAssigner,
+KinesisStreamsSourceEnumeratorState state) {
+this.context = context;
+this.streamArn = streamArn;
+this.consumerConfig = consumerConfig;
+this.streamProxy = streamProxy;
+this.shardAssigner = shardAssigner;
+this.shardAssignerContext = new ShardAssignerContext(splitAssignment, 
context);
+if (state == null) {
+this.lastSeenShardId = null;
+this.unassignedSplits = new HashSet<>();
+} else {
+this.lastSeenShardId = state.getLastSeenShardId();
+this.unassignedSplits = 

[GitHub] [flink-connector-aws] hlteoh37 commented on a diff in pull request #49: [FLINK-24438] Add Kinesis connector using FLIP-27

2023-05-26 Thread via GitHub


hlteoh37 commented on code in PR #49:
URL: 
https://github.com/apache/flink-connector-aws/pull/49#discussion_r1206665143


##
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumerator.java:
##
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source.enumerator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import 
org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.InitialPosition;
+import 
org.apache.flink.connector.kinesis.source.exception.KinesisStreamsSourceException;
+import org.apache.flink.connector.kinesis.source.proxy.StreamProxy;
+import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
+import org.apache.flink.connector.kinesis.source.split.StartingPosition;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap;
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.model.Shard;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+
+import static 
org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS;
+import static 
org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.DEFAULT_STREAM_INITIAL_POSITION;
+import static 
org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS;
+import static 
org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.STREAM_INITIAL_POSITION;
+import static 
org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigUtil.parseStreamTimestampStartingPosition;
+
+/**
+ * This class is used to discover and assign Kinesis splits to subtasks on the 
Flink cluster. This
+ * runs on the JobManager.
+ */
+@Internal
+public class KinesisStreamsSourceEnumerator
+implements SplitEnumerator {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(KinesisStreamsSourceEnumerator.class);
+
+private final SplitEnumeratorContext context;
+private final String streamArn;
+private final Properties consumerConfig;
+private final StreamProxy streamProxy;
+private final KinesisShardAssigner shardAssigner;
+private final ShardAssignerContext shardAssignerContext;
+
+private final Map> splitAssignment = new 
HashMap<>();
+private final Set assignedSplitIds = new HashSet<>();
+private final Set unassignedSplits;
+
+private String lastSeenShardId;
+
+public KinesisStreamsSourceEnumerator(
+SplitEnumeratorContext context,
+String streamArn,
+Properties consumerConfig,
+StreamProxy streamProxy,
+KinesisShardAssigner shardAssigner,
+KinesisStreamsSourceEnumeratorState state) {
+this.context = context;
+this.streamArn = streamArn;
+this.consumerConfig = consumerConfig;
+this.streamProxy = streamProxy;
+this.shardAssigner = shardAssigner;
+this.shardAssignerContext = new ShardAssignerContext(splitAssignment, 
context);
+if (state == null) {
+this.lastSeenShardId = null;
+this.unassignedSplits = new HashSet<>();
+} else {
+this.lastSeenShardId = state.getLastSeenShardId();
+this.unassignedSplits = 

[GitHub] [flink-connector-aws] hlteoh37 commented on a diff in pull request #49: [FLINK-24438] Add Kinesis connector using FLIP-27

2023-05-26 Thread via GitHub


hlteoh37 commented on code in PR #49:
URL: 
https://github.com/apache/flink-connector-aws/pull/49#discussion_r1206664219


##
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumerator.java:
##
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source.enumerator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import 
org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.InitialPosition;
+import 
org.apache.flink.connector.kinesis.source.exception.KinesisStreamsSourceException;
+import org.apache.flink.connector.kinesis.source.proxy.StreamProxy;
+import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
+import org.apache.flink.connector.kinesis.source.split.StartingPosition;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap;
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.model.Shard;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+
+import static 
org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS;
+import static 
org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.DEFAULT_STREAM_INITIAL_POSITION;
+import static 
org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS;
+import static 
org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.STREAM_INITIAL_POSITION;
+import static 
org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigUtil.parseStreamTimestampStartingPosition;
+
+/**
+ * This class is used to discover and assign Kinesis splits to subtasks on the 
Flink cluster. This
+ * runs on the JobManager.
+ */
+@Internal
+public class KinesisStreamsSourceEnumerator
+implements SplitEnumerator {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(KinesisStreamsSourceEnumerator.class);
+
+private final SplitEnumeratorContext context;
+private final String streamArn;
+private final Properties consumerConfig;
+private final StreamProxy streamProxy;
+private final KinesisShardAssigner shardAssigner;
+private final ShardAssignerContext shardAssignerContext;
+
+private final Map> splitAssignment = new 
HashMap<>();
+private final Set assignedSplitIds = new HashSet<>();
+private final Set unassignedSplits;
+
+private String lastSeenShardId;
+
+public KinesisStreamsSourceEnumerator(
+SplitEnumeratorContext context,
+String streamArn,
+Properties consumerConfig,
+StreamProxy streamProxy,
+KinesisShardAssigner shardAssigner,
+KinesisStreamsSourceEnumeratorState state) {
+this.context = context;
+this.streamArn = streamArn;
+this.consumerConfig = consumerConfig;
+this.streamProxy = streamProxy;
+this.shardAssigner = shardAssigner;
+this.shardAssignerContext = new ShardAssignerContext(splitAssignment, 
context);
+if (state == null) {
+this.lastSeenShardId = null;
+this.unassignedSplits = new HashSet<>();
+} else {
+this.lastSeenShardId = state.getLastSeenShardId();
+this.unassignedSplits = 

[GitHub] [flink-connector-aws] hlteoh37 commented on a diff in pull request #49: [FLINK-24438] Add Kinesis connector using FLIP-27

2023-05-26 Thread via GitHub


hlteoh37 commented on code in PR #49:
URL: 
https://github.com/apache/flink-connector-aws/pull/49#discussion_r1206662694


##
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumerator.java:
##
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source.enumerator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import 
org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.InitialPosition;
+import 
org.apache.flink.connector.kinesis.source.exception.KinesisStreamsSourceException;
+import org.apache.flink.connector.kinesis.source.proxy.StreamProxy;
+import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
+import org.apache.flink.connector.kinesis.source.split.StartingPosition;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap;
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.model.Shard;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+
+import static 
org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS;
+import static 
org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.DEFAULT_STREAM_INITIAL_POSITION;
+import static 
org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS;
+import static 
org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.STREAM_INITIAL_POSITION;
+import static 
org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigUtil.parseStreamTimestampStartingPosition;
+
+/**
+ * This class is used to discover and assign Kinesis splits to subtasks on the 
Flink cluster. This
+ * runs on the JobManager.
+ */
+@Internal
+public class KinesisStreamsSourceEnumerator
+implements SplitEnumerator {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(KinesisStreamsSourceEnumerator.class);
+
+private final SplitEnumeratorContext context;
+private final String streamArn;
+private final Properties consumerConfig;
+private final StreamProxy streamProxy;
+private final KinesisShardAssigner shardAssigner;
+private final ShardAssignerContext shardAssignerContext;
+
+private final Map> splitAssignment = new 
HashMap<>();
+private final Set assignedSplitIds = new HashSet<>();
+private final Set unassignedSplits;
+
+private String lastSeenShardId;
+
+public KinesisStreamsSourceEnumerator(
+SplitEnumeratorContext context,
+String streamArn,
+Properties consumerConfig,
+StreamProxy streamProxy,
+KinesisShardAssigner shardAssigner,
+KinesisStreamsSourceEnumeratorState state) {
+this.context = context;
+this.streamArn = streamArn;
+this.consumerConfig = consumerConfig;
+this.streamProxy = streamProxy;
+this.shardAssigner = shardAssigner;
+this.shardAssignerContext = new ShardAssignerContext(splitAssignment, 
context);
+if (state == null) {
+this.lastSeenShardId = null;
+this.unassignedSplits = new HashSet<>();
+} else {
+this.lastSeenShardId = state.getLastSeenShardId();
+this.unassignedSplits = 

[GitHub] [flink-connector-aws] hlteoh37 commented on a diff in pull request #49: [FLINK-24438] Add Kinesis connector using FLIP-27

2023-05-26 Thread via GitHub


hlteoh37 commented on code in PR #49:
URL: 
https://github.com/apache/flink-connector-aws/pull/49#discussion_r1206657247


##
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/config/KinesisStreamsSourceConfigUtil.java:
##
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source.config;
+
+import org.apache.flink.annotation.Internal;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Properties;
+
+import static 
org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT;
+import static 
org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.STREAM_INITIAL_TIMESTAMP;
+import static 
org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT;
+
+/** Utility functions to use with {@link KinesisStreamsSourceConfigConstants}. 
*/
+@Internal
+public class KinesisStreamsSourceConfigUtil {
+
+private KinesisStreamsSourceConfigUtil() {
+// private constructor to prevent initialization of utility class.
+}
+
+/**
+ * Parses the timestamp in which to start consuming from the stream, from 
the given properties.
+ *
+ * @param consumerConfig the properties to parse timestamp from
+ * @return the timestamp
+ */
+public static Date parseStreamTimestampStartingPosition(final Properties 
consumerConfig) {
+String timestamp = 
consumerConfig.getProperty(STREAM_INITIAL_TIMESTAMP);

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32188) Does the custom connector not support pushing down "where" query predicates to query fields of array type?

2023-05-26 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726598#comment-17726598
 ] 

Jark Wu commented on FLINK-32188:
-

But I'm also confused why the array constructor is not evaluated into literal 
before pushing down. 

> Does the custom connector not support pushing down "where" query predicates 
> to query fields of array type?
> --
>
> Key: FLINK-32188
> URL: https://issues.apache.org/jira/browse/FLINK-32188
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.2, 1.17.0, 1.16.1
>Reporter: Xin Chen
>Priority: Major
> Attachments: image-2023-05-25-17-16-02-288.png, 
> image-2023-05-25-20-44-08-834.png, image-2023-05-25-20-44-47-581.png, 
> screenshot-1.png, screenshot-2.png, screenshot-3.png, screenshot-4.png, 
> screenshot-5.png, screenshot-6.png, screenshot-7.png
>
>
> When I customized a data source connector which assumed as image-connector, I 
> found that when creating a table with ddl, I specified a field URL as an 
> array type. When submitting an SQL task with Flink, I specified query this 
> field as a fixed array. For example, select * from image source where 
> URL=ARRAY ['/flink. jpg', '/flink_1. jpg'], but it couldn't obtain the 
> corresponding predicate filters at all.
> Does the custom connector not support  to query fields of array type with 
> "where"?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-aws] hlteoh37 commented on a diff in pull request #49: [FLINK-24438] Add Kinesis connector using FLIP-27

2023-05-26 Thread via GitHub


hlteoh37 commented on code in PR #49:
URL: 
https://github.com/apache/flink-connector-aws/pull/49#discussion_r1206656720


##
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSourceBuilder.java:
##
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import 
org.apache.flink.connector.kinesis.source.enumerator.KinesisShardAssigner;
+import 
org.apache.flink.connector.kinesis.source.enumerator.assigner.ShardAssignerFactory;
+import 
org.apache.flink.connector.kinesis.source.enumerator.assigner.UniformShardAssigner;
+import 
org.apache.flink.connector.kinesis.source.serialization.KinesisDeserializationSchema;
+
+import java.util.Properties;
+
+/**
+ * Builder to construct the {@link KinesisStreamsSource}.
+ *
+ * The following example shows the minimum setup to create a {@link 
KinesisStreamsSource} that
+ * reads String values from a Kinesis Data Streams stream with ARN of
+ * arn:aws:kinesis:us-east-1:012345678901:stream/your_stream_name.
+ *
+ * {@code
+ * KinesisStreamsSource kdsSource =
+ * KinesisStreamsSource.builder()
+ * 
.setStreamArn("arn:aws:kinesis:us-east-1:012345678901:stream/your_stream_name")
+ * .setDeserializationSchema(new SimpleStringSchema())
+ * .build();
+ * }
+ *
+ * If the following parameters are not set in this builder, the following 
defaults will be used:
+ *
+ * 
+ *   {@code kinesisShardAssigner} will be {@link UniformShardAssigner}
+ * 
+ *
+ * @param  type of elements that should be read from the source stream
+ */
+@Experimental
+public class KinesisStreamsSourceBuilder {
+private String streamArn;
+private Properties consumerConfig = new Properties();

Review Comment:
   Ok done. We can add feature to parse region from the ARN in a followup



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-31990) Use Flink Configuration to specify KDS Source configuration object

2023-05-26 Thread Hong Liang Teoh (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hong Liang Teoh updated FLINK-31990:

Description: 
*What*

Use the Flink Configuration object to standardise the method of specifying 
configurations for the KDS source. 

 

Also include validations:

- Check that region in config matches ARN. ARN should take priority.

 

*Why*

We want to standardise error messages + source serialization methods 
implemented by Flink on the Flink Configuration objects.

 

  was:
*What*

Use the Flink Configuration object to standardise the method of specifying 
configurations for the KDS source. 

 

*Why*

We want to standardise error messages + source serialization methods 
implemented by Flink on the Flink Configuration objects.

 


> Use Flink Configuration to specify KDS Source configuration object
> --
>
> Key: FLINK-31990
> URL: https://issues.apache.org/jira/browse/FLINK-31990
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Hong Liang Teoh
>Priority: Major
>
> *What*
> Use the Flink Configuration object to standardise the method of specifying 
> configurations for the KDS source. 
>  
> Also include validations:
> - Check that region in config matches ARN. ARN should take priority.
>  
> *Why*
> We want to standardise error messages + source serialization methods 
> implemented by Flink on the Flink Configuration objects.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-aws] hlteoh37 commented on a diff in pull request #49: [FLINK-24438] Add Kinesis connector using FLIP-27

2023-05-26 Thread via GitHub


hlteoh37 commented on code in PR #49:
URL: 
https://github.com/apache/flink-connector-aws/pull/49#discussion_r1206655247


##
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/config/KinesisStreamsSourceConfigUtil.java:
##
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source.config;
+
+import org.apache.flink.annotation.Internal;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Properties;
+
+import static 
org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT;
+import static 
org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.STREAM_INITIAL_TIMESTAMP;
+import static 
org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT;
+
+/** Utility functions to use with {@link KinesisStreamsSourceConfigConstants}. 
*/
+@Internal
+public class KinesisStreamsSourceConfigUtil {

Review Comment:
   Yes. I have added this to https://issues.apache.org/jira/browse/FLINK-31990



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32188) Does the custom connector not support pushing down "where" query predicates to query fields of array type?

2023-05-26 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726597#comment-17726597
 ] 

Jark Wu commented on FLINK-32188:
-

Yes, I think your fix is fine. The function name "array" in 
BuiltInFunctionDefinition is not equal to "ARRAY_VALUE_CONSTRUCTOR", I think 
that's why we can't find the function via {{lookupFunction}}.

> Does the custom connector not support pushing down "where" query predicates 
> to query fields of array type?
> --
>
> Key: FLINK-32188
> URL: https://issues.apache.org/jira/browse/FLINK-32188
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.2, 1.17.0, 1.16.1
>Reporter: Xin Chen
>Priority: Major
> Attachments: image-2023-05-25-17-16-02-288.png, 
> image-2023-05-25-20-44-08-834.png, image-2023-05-25-20-44-47-581.png, 
> screenshot-1.png, screenshot-2.png, screenshot-3.png, screenshot-4.png, 
> screenshot-5.png, screenshot-6.png, screenshot-7.png
>
>
> When I customized a data source connector which assumed as image-connector, I 
> found that when creating a table with ddl, I specified a field URL as an 
> array type. When submitting an SQL task with Flink, I specified query this 
> field as a fixed array. For example, select * from image source where 
> URL=ARRAY ['/flink. jpg', '/flink_1. jpg'], but it couldn't obtain the 
> corresponding predicate filters at all.
> Does the custom connector not support  to query fields of array type with 
> "where"?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32176) [CVE-2022-1471] Mitigate CVE from snakeyaml coming from pulsar-client-all

2023-05-26 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32176?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-32176:
---
Labels: pull-request-available  (was: )

> [CVE-2022-1471] Mitigate CVE from snakeyaml coming from pulsar-client-all
> -
>
> Key: FLINK-32176
> URL: https://issues.apache.org/jira/browse/FLINK-32176
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: 1.17.0
>Reporter: Samrat Deb
>Assignee: Samrat Deb
>Priority: Major
>  Labels: pull-request-available
>
>  
>  * *CVE ID:* {{CVE-2022-1471}}
>  * *CWE:* CWE-502 Deserialization of Untrusted Data
>  * {*}Severity{*}: Critical
> {{pulsar-client-all-2.10.0.jar (shaded: org.yaml:snakeyaml:1.30)}}
>  
> {{SnakeYaml's Constructor() class does not restrict types which can be 
> instantiated during deserialization. Deserializing yaml content provided by 
> an attacker can lead to remote code execution. We recommend using SnakeYaml's 
> SafeConsturctor when parsing untrusted content to restrict deserialization.}}
> {{}}
> {{More details : https://nvd.nist.gov/vuln/detail/CVE-2022-1471}}
> {{{}{}}}{{{}{}}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-pulsar] Samrat002 opened a new pull request, #51: [FLINK-32176] Exclude snakeyaml from pulsar-client-all

2023-05-26 Thread via GitHub


Samrat002 opened a new pull request, #51:
URL: https://github.com/apache/flink-connector-pulsar/pull/51

   
   
   ## Purpose of the change
   Mitigate the imapact of CVE in flink
   
   ## Brief change log
   
   Exclude snakeyaml from pulsar-client-all
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the 
conventions defined in our code quality
   guide: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
   
   - *Added unit tests*
   - *Added integration tests for end-to-end deployment*
   - *Manually verified by running the Pulsar connector on a local Flink 
cluster.*
   
   ## Significant changes
   
   *(Please check any boxes [x] if the answer is "yes". You can first publish 
the PR and check them afterwards, for
   convenience.)*
   
   - [ ] Dependencies have been added or upgraded
   - [ ] Public API has been changed (Public API is any class annotated with 
`@Public(Evolving)`)
   - [ ] Serializers have been changed
   - [ ] New feature has been introduced
   - If yes, how is this documented? (not applicable / docs / JavaDocs / 
not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32008) Protobuf format cannot work with FileSystem Connector

2023-05-26 Thread Benchao Li (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32008?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Benchao Li updated FLINK-32008:
---
Summary: Protobuf format cannot work with FileSystem Connector  (was: 
Protobuf format throws exception with Map datatype)

> Protobuf format cannot work with FileSystem Connector
> -
>
> Key: FLINK-32008
> URL: https://issues.apache.org/jira/browse/FLINK-32008
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.17.0
>Reporter: Xuannan Su
>Priority: Major
> Attachments: flink-protobuf-example.zip
>
>
> The protobuf format throws exception when working with Map data type. I 
> uploaded a example project to reproduce the problem.
>  
> {code:java}
> Caused by: java.lang.RuntimeException: One or more fetchers have encountered 
> exception
>     at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:261)
>     at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
>     at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:131)
>     at 
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:417)
>     at 
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
>     at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received 
> unexpected exception while polling the records
>     at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
>     at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:114)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     ... 1 more
> Caused by: java.io.IOException: Failed to deserialize PB object.
>     at 
> org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.deserialize(PbRowDataDeserializationSchema.java:75)
>     at 
> org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.deserialize(PbRowDataDeserializationSchema.java:42)
>     at 
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
>     at 
> org.apache.flink.connector.file.table.DeserializationSchemaAdapter$LineBytesInputFormat.readRecord(DeserializationSchemaAdapter.java:197)
>     at 
> org.apache.flink.connector.file.table.DeserializationSchemaAdapter$LineBytesInputFormat.nextRecord(DeserializationSchemaAdapter.java:210)
>     at 
> org.apache.flink.connector.file.table.DeserializationSchemaAdapter$Reader.readBatch(DeserializationSchemaAdapter.java:124)
>     at 
> org.apache.flink.connector.file.src.util.RecordMapperWrapperRecordIterator$1.readBatch(RecordMapperWrapperRecordIterator.java:82)
>     at 
> org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:67)
>     at 
> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
>     at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:162)
>     ... 6 more
> Caused by: java.lang.reflect.InvocationTargetException
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at 
> 

[jira] [Commented] (FLINK-32008) Protobuf format cannot work with FileSystem Connector

2023-05-26 Thread Benchao Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726583#comment-17726583
 ] 

Benchao Li commented on FLINK-32008:


I've changed the title.

> Protobuf format cannot work with FileSystem Connector
> -
>
> Key: FLINK-32008
> URL: https://issues.apache.org/jira/browse/FLINK-32008
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.17.0
>Reporter: Xuannan Su
>Priority: Major
> Attachments: flink-protobuf-example.zip
>
>
> The protobuf format throws exception when working with Map data type. I 
> uploaded a example project to reproduce the problem.
>  
> {code:java}
> Caused by: java.lang.RuntimeException: One or more fetchers have encountered 
> exception
>     at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:261)
>     at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
>     at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:131)
>     at 
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:417)
>     at 
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
>     at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received 
> unexpected exception while polling the records
>     at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
>     at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:114)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     ... 1 more
> Caused by: java.io.IOException: Failed to deserialize PB object.
>     at 
> org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.deserialize(PbRowDataDeserializationSchema.java:75)
>     at 
> org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.deserialize(PbRowDataDeserializationSchema.java:42)
>     at 
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
>     at 
> org.apache.flink.connector.file.table.DeserializationSchemaAdapter$LineBytesInputFormat.readRecord(DeserializationSchemaAdapter.java:197)
>     at 
> org.apache.flink.connector.file.table.DeserializationSchemaAdapter$LineBytesInputFormat.nextRecord(DeserializationSchemaAdapter.java:210)
>     at 
> org.apache.flink.connector.file.table.DeserializationSchemaAdapter$Reader.readBatch(DeserializationSchemaAdapter.java:124)
>     at 
> org.apache.flink.connector.file.src.util.RecordMapperWrapperRecordIterator$1.readBatch(RecordMapperWrapperRecordIterator.java:82)
>     at 
> org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:67)
>     at 
> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
>     at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:162)
>     ... 6 more
> Caused by: java.lang.reflect.InvocationTargetException
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at 
> 

[jira] [Commented] (FLINK-32204) ZooKeeperLeaderElectionTest.testZooKeeperReelectionWithReplacement fails with The ExecutorService is shut down already. No Callables can be executed on AZP

2023-05-26 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726580#comment-17726580
 ] 

Matthias Pohl commented on FLINK-32204:
---

It's most likely being caused by FLINK-31995. The 
{{ZooKeeperLeaderElectionDriver}} uses a {{DirectExecutorService}} in for the 
{{TreeCache}}. The {{RejectedExecutionException}} handling was added in 
FLINK-31995. So, it could be that it reveals a bug in some other code which was 
just not visible before.

> ZooKeeperLeaderElectionTest.testZooKeeperReelectionWithReplacement fails with 
> The ExecutorService is shut down already. No Callables can be executed on AZP
> ---
>
> Key: FLINK-32204
> URL: https://issues.apache.org/jira/browse/FLINK-32204
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.18.0
>Reporter: Sergey Nuyanzin
>Assignee: Matthias Pohl
>Priority: Critical
>  Labels: test-stability
>
> [This 
> build|https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49386=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=7095]
>  fails as
> {noformat}
> May 25 18:45:50 Caused by: java.util.concurrent.RejectedExecutionException: 
> The ExecutorService is shut down already. No Callables can be executed.
> May 25 18:45:50   at 
> org.apache.flink.util.concurrent.DirectExecutorService.throwRejectedExecutionExceptionIfShutdown(DirectExecutorService.java:237)
> May 25 18:45:50   at 
> org.apache.flink.util.concurrent.DirectExecutorService.submit(DirectExecutorService.java:100)
> May 25 18:45:50   at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.TreeCache.publishEvent(TreeCache.java:902)
> May 25 18:45:50   at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.TreeCache.publishEvent(TreeCache.java:894)
> May 25 18:45:50   at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.TreeCache.access$1200(TreeCache.java:79)
> May 25 18:45:50   at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.TreeCache$TreeNode.processResult(TreeCache.java:489)
> May 25 18:45:50   at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.CuratorFrameworkImpl.sendToBackgroundCallback(CuratorFrameworkImpl.java:926)
> May 25 18:45:50   at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.CuratorFrameworkImpl.processBackgroundOperation(CuratorFrameworkImpl.java:683)
> May 25 18:45:50   at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.WatcherRemovalFacade.processBackgroundOperation(WatcherRemovalFacade.java:152)
> May 25 18:45:50   at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.GetDataBuilderImpl$3.processResult(GetDataBuilderImpl.java:272)
> May 25 18:45:50   at 
> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:634)
> May 25 18:45:50   at 
> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:553)
> May 25 18:45:50
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] zentol commented on pull request #22660: [FLINK-3154][runtime] Upgrade from Kryo v2 + Chill 0.7.6 to Kryo v5 w…

2023-05-26 Thread via GitHub


zentol commented on PR #22660:
URL: https://github.com/apache/flink/pull/22660#issuecomment-1564201206

   I appreciate the work, but really this change has to go through a proper 
design process.
   
   There are open questions as to whether we really want to expose Kryo in the 
same way we did before (because the presence of Kryo serializers in the 
execution config is a problem in general).
   
   Please create a 
[FLIP](https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals)
 and publish it on the dev mailing list.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #22666: [Just For Test] Update japicmp configuration for 1.16.2

2023-05-26 Thread via GitHub


flinkbot commented on PR #22666:
URL: https://github.com/apache/flink/pull/22666#issuecomment-1564193372

   
   ## CI report:
   
   * f978740193f6053e57003801e870b91b398a3d18 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #22665: [Just For Test] Update japicmp configuration for 1.17.1

2023-05-26 Thread via GitHub


flinkbot commented on PR #22665:
URL: https://github.com/apache/flink/pull/22665#issuecomment-1564188090

   
   ## CI report:
   
   * 5ac854d04e1f587da5f70ea96326bb4d1d674497 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-web] reswqa merged pull request #650: Add Flink 1.17.1

2023-05-26 Thread via GitHub


reswqa merged PR #650:
URL: https://github.com/apache/flink-web/pull/650


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-web] reswqa merged pull request #649: Add Flink 1.16.2

2023-05-26 Thread via GitHub


reswqa merged PR #649:
URL: https://github.com/apache/flink-web/pull/649


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32008) Protobuf format throws exception with Map datatype

2023-05-26 Thread Ryan Skraba (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726567#comment-17726567
 ] 

Ryan Skraba commented on FLINK-32008:
-

It's probably worth checking in with the community to see what we expect of a 
bulk format, or if it would be an interesting thing to add!  I'll ask on the 
mailing list.

I took a quick look but didn't see any related JIRA (outside of FLINK-12149, 
which proposes using the protobuf API to interact with parquet files).  Can a 
committer change the title of this JIRA to better reflect the issue?  Something 
like "Protobuf format on filesystem is faulty"

> Protobuf format throws exception with Map datatype
> --
>
> Key: FLINK-32008
> URL: https://issues.apache.org/jira/browse/FLINK-32008
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.17.0
>Reporter: Xuannan Su
>Priority: Major
> Attachments: flink-protobuf-example.zip
>
>
> The protobuf format throws exception when working with Map data type. I 
> uploaded a example project to reproduce the problem.
>  
> {code:java}
> Caused by: java.lang.RuntimeException: One or more fetchers have encountered 
> exception
>     at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:261)
>     at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
>     at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:131)
>     at 
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:417)
>     at 
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
>     at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received 
> unexpected exception while polling the records
>     at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
>     at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:114)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     ... 1 more
> Caused by: java.io.IOException: Failed to deserialize PB object.
>     at 
> org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.deserialize(PbRowDataDeserializationSchema.java:75)
>     at 
> org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.deserialize(PbRowDataDeserializationSchema.java:42)
>     at 
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
>     at 
> org.apache.flink.connector.file.table.DeserializationSchemaAdapter$LineBytesInputFormat.readRecord(DeserializationSchemaAdapter.java:197)
>     at 
> org.apache.flink.connector.file.table.DeserializationSchemaAdapter$LineBytesInputFormat.nextRecord(DeserializationSchemaAdapter.java:210)
>     at 
> org.apache.flink.connector.file.table.DeserializationSchemaAdapter$Reader.readBatch(DeserializationSchemaAdapter.java:124)
>     at 
> org.apache.flink.connector.file.src.util.RecordMapperWrapperRecordIterator$1.readBatch(RecordMapperWrapperRecordIterator.java:82)
>     at 
> org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:67)
>     at 
> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
>     at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:162)
>     ... 6 more
> Caused by: 

[jira] [Commented] (FLINK-32202) useless configuration

2023-05-26 Thread Weihua Hu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32202?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726566#comment-17726566
 ] 

Weihua Hu commented on FLINK-32202:
---

[~zhangdong7] Could you explain more about "but the parameter 
query.server.ports:6125 will be generated when Flink starts"? I couldn't find 
the use of "query.server.ports" in the master branch.

> useless configuration
> -
>
> Key: FLINK-32202
> URL: https://issues.apache.org/jira/browse/FLINK-32202
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Queryable State
>Affects Versions: 1.15.4
>Reporter: zhangdong7
>Priority: Minor
>
> According to the official Flink documentation, the parameter 
> query.server.ports has been replaced by queryable-state.server.ports, but the 
> parameter query.server.ports:6125 will be generated when Flink starts. Is 
> this a historical problem?
>  
> {code:java}
> public static final ConfigOption SERVER_PORT_RANGE = 
> ConfigOptions.key("queryable-state.server.ports").stringType().defaultValue("9067").withDescription("The
>  port range of the queryable state server. The specified range can be a 
> single port: \"9123\", a range of ports: \"50100-50200\", or a list of ranges 
> and ports: \"50100-50200,50300-50400,51234\".").withDeprecatedKeys(new 
> String[]{"query.server.ports"});{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (FLINK-32202) useless configuration

2023-05-26 Thread Weihua Hu (Jira)


[ https://issues.apache.org/jira/browse/FLINK-32202 ]


Weihua Hu deleted comment on FLINK-32202:
---

was (Author: huwh):
[~zhangdong7] Could you explain more about "but the parameter 
query.server.ports:6125 will be generated when Flink starts"? I couldn't find 
the use of "query.server.ports" in the master branch.

> useless configuration
> -
>
> Key: FLINK-32202
> URL: https://issues.apache.org/jira/browse/FLINK-32202
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Queryable State
>Affects Versions: 1.15.4
>Reporter: zhangdong7
>Priority: Minor
>
> According to the official Flink documentation, the parameter 
> query.server.ports has been replaced by queryable-state.server.ports, but the 
> parameter query.server.ports:6125 will be generated when Flink starts. Is 
> this a historical problem?
>  
> {code:java}
> public static final ConfigOption SERVER_PORT_RANGE = 
> ConfigOptions.key("queryable-state.server.ports").stringType().defaultValue("9067").withDescription("The
>  port range of the queryable state server. The specified range can be a 
> single port: \"9123\", a range of ports: \"50100-50200\", or a list of ranges 
> and ports: \"50100-50200,50300-50400,51234\".").withDeprecatedKeys(new 
> String[]{"query.server.ports"});{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32202) useless configuration

2023-05-26 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32202?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser updated FLINK-32202:
---
Component/s: Runtime / Queryable State
 (was: API / Core)

> useless configuration
> -
>
> Key: FLINK-32202
> URL: https://issues.apache.org/jira/browse/FLINK-32202
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Queryable State
>Affects Versions: 1.15.4
>Reporter: zhangdong7
>Priority: Minor
>
> According to the official Flink documentation, the parameter 
> query.server.ports has been replaced by queryable-state.server.ports, but the 
> parameter query.server.ports:6125 will be generated when Flink starts. Is 
> this a historical problem?
>  
> {code:java}
> public static final ConfigOption SERVER_PORT_RANGE = 
> ConfigOptions.key("queryable-state.server.ports").stringType().defaultValue("9067").withDescription("The
>  port range of the queryable state server. The specified range can be a 
> single port: \"9123\", a range of ports: \"50100-50200\", or a list of ranges 
> and ports: \"50100-50200,50300-50400,51234\".").withDeprecatedKeys(new 
> String[]{"query.server.ports"});{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-32202) useless configuration

2023-05-26 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32202?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser closed FLINK-32202.
--
Resolution: Invalid

Queryable State is (soft) deprecated, so this should doesn't have to be fixed 
at this moment

> useless configuration
> -
>
> Key: FLINK-32202
> URL: https://issues.apache.org/jira/browse/FLINK-32202
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Core
>Affects Versions: 1.15.4
>Reporter: zhangdong7
>Priority: Minor
>
> According to the official Flink documentation, the parameter 
> query.server.ports has been replaced by queryable-state.server.ports, but the 
> parameter query.server.ports:6125 will be generated when Flink starts. Is 
> this a historical problem?
>  
> {code:java}
> public static final ConfigOption SERVER_PORT_RANGE = 
> ConfigOptions.key("queryable-state.server.ports").stringType().defaultValue("9067").withDescription("The
>  port range of the queryable state server. The specified range can be a 
> single port: \"9123\", a range of ports: \"50100-50200\", or a list of ranges 
> and ports: \"50100-50200,50300-50400,51234\".").withDeprecatedKeys(new 
> String[]{"query.server.ports"});{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] MartijnVisser commented on pull request #22664: [FLINK-32203] Remove monitoringInterval to reduce probability of ClassLoader memory leak

2023-05-26 Thread via GitHub


MartijnVisser commented on PR #22664:
URL: https://github.com/apache/flink/pull/22664#issuecomment-1564121721

   @JTaky Thanks for the PR. However, would it be possible to actually address 
the issue of the memory leak, instead of applying the mitigating measure that 
this PR introduces? 
   
   Also curious on @zentol his opinion for this


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-29750) Improve PostgresCatalog#listTables() by reusing resources

2023-05-26 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29750?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser closed FLINK-29750.
--
Fix Version/s: jdbc-3.2.0
   Resolution: Fixed

Fixed in 
apache/flink-connector-jdbc:main 7fc202be3dfcdb6510f9855a6943dd97fa2bd3af

> Improve PostgresCatalog#listTables() by reusing resources
> -
>
> Key: FLINK-29750
> URL: https://issues.apache.org/jira/browse/FLINK-29750
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC, Table SQL / Ecosystem
>Reporter: Mingliang Liu
>Assignee: Mingliang Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: jdbc-3.2.0
>
>
> Currently the {{PostgresCatalog#listTables()}} creates a new connection and 
> prepared statement for every schema and table when listing tables. This can 
> be optimized by reusing those resources.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (FLINK-29750) Improve PostgresCatalog#listTables() by reusing resources

2023-05-26 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29750?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser reopened FLINK-29750:

  Assignee: Mingliang Liu

> Improve PostgresCatalog#listTables() by reusing resources
> -
>
> Key: FLINK-29750
> URL: https://issues.apache.org/jira/browse/FLINK-29750
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC, Table SQL / Ecosystem
>Reporter: Mingliang Liu
>Assignee: Mingliang Liu
>Priority: Major
>  Labels: pull-request-available
>
> Currently the {{PostgresCatalog#listTables()}} creates a new connection and 
> prepared statement for every schema and table when listing tables. This can 
> be optimized by reusing those resources.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32051) Fix broken documentation links in Flink blogs

2023-05-26 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726556#comment-17726556
 ] 

Weijie Guo commented on FLINK-32051:


merged in b64f8efce94018b87a4046c6c4d01e0aed1f5ab5.

> Fix broken documentation links in Flink blogs
> -
>
> Key: FLINK-32051
> URL: https://issues.apache.org/jira/browse/FLINK-32051
> Project: Flink
>  Issue Type: Improvement
>  Components: Project Website
>Reporter: Zhilong Hong
>Assignee: Zhilong Hong
>Priority: Minor
>  Labels: pull-request-available
>
> Currently, the links to the documentations in the blogs are broken. We need 
> to add a slash ({{{}/{}}}) at the end of the param {{DocsBaseUrl}} in 
> config.toml like this:
> {noformat}
> [params]
>   DocsBaseUrl = "//nightlies.apache.org/flink/"
> {noformat}
> Also, the links in this 
> [post|https://flink.apache.org/2022/01/04/how-we-improved-scheduler-performance-for-large-scale-jobs-part-two/]
>  is not rendered correctly. We need to add a newline after the {{{}{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-32051) Fix broken documentation links in Flink blogs

2023-05-26 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo resolved FLINK-32051.

Resolution: Fixed

> Fix broken documentation links in Flink blogs
> -
>
> Key: FLINK-32051
> URL: https://issues.apache.org/jira/browse/FLINK-32051
> Project: Flink
>  Issue Type: Improvement
>  Components: Project Website
>Reporter: Zhilong Hong
>Assignee: Zhilong Hong
>Priority: Minor
>  Labels: pull-request-available
>
> Currently, the links to the documentations in the blogs are broken. We need 
> to add a slash ({{{}/{}}}) at the end of the param {{DocsBaseUrl}} in 
> config.toml like this:
> {noformat}
> [params]
>   DocsBaseUrl = "//nightlies.apache.org/flink/"
> {noformat}
> Also, the links in this 
> [post|https://flink.apache.org/2022/01/04/how-we-improved-scheduler-performance-for-large-scale-jobs-part-two/]
>  is not rendered correctly. We need to add a newline after the {{{}{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-jdbc] MartijnVisser commented on pull request #3: [FLINK-15462][connectors] Add Trino dialect

2023-05-26 Thread via GitHub


MartijnVisser commented on PR #3:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/3#issuecomment-1564085808

   > we’d appreciate it very much if you could please ensure that Flink JDBC 
connector Trino dialect support Kerberos authentication like Trino Python 
client,
   
   I think that should be a follow-up, and not in the initial PR. Is it also 
something that you could contribute to?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32059) Migrate subclasses of BatchAbstractTestBase in batch.sql.agg and batch.sql.join to JUnit5

2023-05-26 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32059?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-32059:
---
Labels: pull-request-available  (was: )

> Migrate subclasses of BatchAbstractTestBase in batch.sql.agg and 
> batch.sql.join to JUnit5
> -
>
> Key: FLINK-32059
> URL: https://issues.apache.org/jira/browse/FLINK-32059
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.18.0
>Reporter: Yuxin Tan
>Assignee: Wencong Liu
>Priority: Major
>  Labels: pull-request-available
>
> Migrate subclasses of BatchAbstractTestBase in batch.sql.agg and 
> batch.sql.join to JUnit5.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] reswqa commented on a diff in pull request #22579: [FLINK-32059] Migrate subclasses of BatchAbstractTestBase in batch.sql.agg and batch.sql.join to JUnit5

2023-05-26 Thread via GitHub


reswqa commented on code in PR #22579:
URL: https://github.com/apache/flink/pull/22579#discussion_r1206475729


##
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/OuterJoinITCase.scala:
##
@@ -398,7 +398,7 @@ class OuterJoinITCase(expectedJoinType: JoinType) extends 
BatchTestBase {
 }
 
 object OuterJoinITCase {
-  @Parameterized.Parameters(name = "{0}")
+  @Parameters(name = "{0}")

Review Comment:
   ```suggestion
 @Parameters(name = "expectedJoinType={0}")
   ```



##
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/SemiJoinITCase.scala:
##
@@ -470,7 +469,7 @@ class SemiJoinITCase(expectedJoinType: JoinType) extends 
BatchTestBase {
 }
 
 object SemiJoinITCase {
-  @Parameterized.Parameters(name = "{0}-{1}")
+  @Parameters(name = "{0}-{1}")

Review Comment:
   It feels a bit strange, isn't this test class only one parameter? 



##
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/LookupJoinITCase.scala:
##
@@ -26,23 +26,31 @@ import 
org.apache.flink.table.planner.factories.TestValuesTableFactory
 import org.apache.flink.table.planner.runtime.utils.{BatchTestBase, 
InMemoryLookupableTableSource}
 import 
org.apache.flink.table.runtime.functions.table.fullcache.inputformat.FullCacheTestInputFormat
 import org.apache.flink.table.runtime.functions.table.lookup.LookupCacheManager
+import org.apache.flink.testutils.junit.extensions.parameterized.{Parameter, 
ParameterizedTestExtension, Parameters}
 import org.apache.flink.types.Row
 
 import org.assertj.core.api.Assertions.assertThat
+import org.assertj.core.api.Assumptions.assumeThat
 import org.assertj.core.api.IterableAssert.assertThatIterable
-import org.junit.{After, Assume, Before, Test}
-import org.junit.Assert.assertEquals
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
+import org.junit.jupiter.api.{AfterEach, BeforeEach, TestTemplate}
+import org.junit.jupiter.api.Assertions.assertEquals

Review Comment:
   Junit5 assertions are also discouraged.



##
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/JoinITCase.scala:
##
@@ -1333,7 +1335,7 @@ class JoinITCase(expectedJoinType: JoinType) extends 
BatchTestBase {
 }
 
 object JoinITCase {
-  @Parameterized.Parameters(name = "{0}")
+  @Parameters(name = "{0}")

Review Comment:
   ```suggestion
 @Parameters(name = "expectedJoinType = {0}")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32204) ZooKeeperLeaderElectionTest.testZooKeeperReelectionWithReplacement fails with The ExecutorService is shut down already. No Callables can be executed on AZP

2023-05-26 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726545#comment-17726545
 ] 

Matthias Pohl commented on FLINK-32204:
---

FYI: CI failed on [a4de8945|https://github.com/apache/flink/commit/a4de8945]. 
That version didn't include FLINK-31776, yet.

> ZooKeeperLeaderElectionTest.testZooKeeperReelectionWithReplacement fails with 
> The ExecutorService is shut down already. No Callables can be executed on AZP
> ---
>
> Key: FLINK-32204
> URL: https://issues.apache.org/jira/browse/FLINK-32204
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.18.0
>Reporter: Sergey Nuyanzin
>Assignee: Matthias Pohl
>Priority: Critical
>  Labels: test-stability
>
> [This 
> build|https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49386=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=7095]
>  fails as
> {noformat}
> May 25 18:45:50 Caused by: java.util.concurrent.RejectedExecutionException: 
> The ExecutorService is shut down already. No Callables can be executed.
> May 25 18:45:50   at 
> org.apache.flink.util.concurrent.DirectExecutorService.throwRejectedExecutionExceptionIfShutdown(DirectExecutorService.java:237)
> May 25 18:45:50   at 
> org.apache.flink.util.concurrent.DirectExecutorService.submit(DirectExecutorService.java:100)
> May 25 18:45:50   at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.TreeCache.publishEvent(TreeCache.java:902)
> May 25 18:45:50   at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.TreeCache.publishEvent(TreeCache.java:894)
> May 25 18:45:50   at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.TreeCache.access$1200(TreeCache.java:79)
> May 25 18:45:50   at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.TreeCache$TreeNode.processResult(TreeCache.java:489)
> May 25 18:45:50   at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.CuratorFrameworkImpl.sendToBackgroundCallback(CuratorFrameworkImpl.java:926)
> May 25 18:45:50   at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.CuratorFrameworkImpl.processBackgroundOperation(CuratorFrameworkImpl.java:683)
> May 25 18:45:50   at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.WatcherRemovalFacade.processBackgroundOperation(WatcherRemovalFacade.java:152)
> May 25 18:45:50   at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.GetDataBuilderImpl$3.processResult(GetDataBuilderImpl.java:272)
> May 25 18:45:50   at 
> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:634)
> May 25 18:45:50   at 
> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:553)
> May 25 18:45:50
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-32204) ZooKeeperLeaderElectionTest.testZooKeeperReelectionWithReplacement fails with The ExecutorService is shut down already. No Callables can be executed on AZP

2023-05-26 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726540#comment-17726540
 ] 

Matthias Pohl edited comment on FLINK-32204 at 5/26/23 9:14 AM:


Thanks for reporting the issue. I'm going to have a look. -But I suspect it not 
being related to the FLINK-26522 changes because the test still relies on the 
legacy {{LeaderElectionDriver}} implementation of ZooKeeper.- This specific 
test utilizes the {{DefaultLeaderElectionService}} which was touched in 
FLINK-31838 and FLINK-31773


was (Author: mapohl):
Thanks for reporting the issue. I'm going to have a look. But I suspect it not 
being related to the FLINK-26522 changes because the test still relies on the 
legacy {{LeaderElectionDriver}} implementation of ZooKeeper.

> ZooKeeperLeaderElectionTest.testZooKeeperReelectionWithReplacement fails with 
> The ExecutorService is shut down already. No Callables can be executed on AZP
> ---
>
> Key: FLINK-32204
> URL: https://issues.apache.org/jira/browse/FLINK-32204
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.18.0
>Reporter: Sergey Nuyanzin
>Assignee: Matthias Pohl
>Priority: Critical
>  Labels: test-stability
>
> [This 
> build|https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49386=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=7095]
>  fails as
> {noformat}
> May 25 18:45:50 Caused by: java.util.concurrent.RejectedExecutionException: 
> The ExecutorService is shut down already. No Callables can be executed.
> May 25 18:45:50   at 
> org.apache.flink.util.concurrent.DirectExecutorService.throwRejectedExecutionExceptionIfShutdown(DirectExecutorService.java:237)
> May 25 18:45:50   at 
> org.apache.flink.util.concurrent.DirectExecutorService.submit(DirectExecutorService.java:100)
> May 25 18:45:50   at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.TreeCache.publishEvent(TreeCache.java:902)
> May 25 18:45:50   at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.TreeCache.publishEvent(TreeCache.java:894)
> May 25 18:45:50   at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.TreeCache.access$1200(TreeCache.java:79)
> May 25 18:45:50   at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.TreeCache$TreeNode.processResult(TreeCache.java:489)
> May 25 18:45:50   at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.CuratorFrameworkImpl.sendToBackgroundCallback(CuratorFrameworkImpl.java:926)
> May 25 18:45:50   at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.CuratorFrameworkImpl.processBackgroundOperation(CuratorFrameworkImpl.java:683)
> May 25 18:45:50   at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.WatcherRemovalFacade.processBackgroundOperation(WatcherRemovalFacade.java:152)
> May 25 18:45:50   at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.GetDataBuilderImpl$3.processResult(GetDataBuilderImpl.java:272)
> May 25 18:45:50   at 
> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:634)
> May 25 18:45:50   at 
> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:553)
> May 25 18:45:50
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   >