[jira] [Commented] (FLINK-7289) Memory allocation of RocksDB can be problematic in container environments
[ https://issues.apache.org/jira/browse/FLINK-7289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16445353#comment-16445353 ] Sihua Zhou commented on FLINK-7289: --- Hi [~srichter], I think it's quite hard to tell what is the best configuration for RocksDB, but there is one experience we had when using the RocksDBBackend, that is when we using RocksDBBackend we always set the {{taskmanager.memory.size}}, this parameter define the OFF-HEAP memory size that used by TM, it maybe a bit weird that we set this parameter for a stream job (because AFAIK currently only the batch job will use the OFF-HEAP memory to create direct memory buffer), the purpose that we set this parameter is we want to reserve this OFF-HEAP memory for RocksDB, even thought RocksDB doesn't regard it or controlled by it, but it split the total memory apart, one is HEAP memory used by JVM and controlled quite well with GC, other is OFF-HEAP that we reserve for RocksDB. This help us to be safer, but the parameter's name(taskmanager.memory.size) is quite weird regard of it's purpose in a stream job at a first glance. What do you think? > Memory allocation of RocksDB can be problematic in container environments > - > > Key: FLINK-7289 > URL: https://issues.apache.org/jira/browse/FLINK-7289 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.2.0, 1.3.0, 1.4.0 >Reporter: Stefan Richter >Priority: Major > > Flink's RocksDB based state backend allocates native memory. The amount of > allocated memory by RocksDB is not under the control of Flink or the JVM and > can (theoretically) grow without limits. > In container environments, this can be problematic because the process can > exceed the memory budget of the container, and the process will get killed. > Currently, there is no other option than trusting RocksDB to be well behaved > and to follow its memory configurations. However, limiting RocksDB's memory > usage is not as easy as setting a single limit parameter. The memory limit is > determined by an interplay of several configuration parameters, which is > almost impossible to get right for users. Even worse, multiple RocksDB > instances can run inside the same process and make reasoning about the > configuration also dependent on the Flink job. > Some information about the memory management in RocksDB can be found here: > https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB > https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide > We should try to figure out ways to help users in one or more of the > following ways: > - Some way to autotune or calculate the RocksDB configuration. > - Conservative default values. > - Additional documentation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8836) Duplicating a KryoSerializer does not duplicate registered default serializers
[ https://issues.apache.org/jira/browse/FLINK-8836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16445330#comment-16445330 ] ASF GitHub Bot commented on FLINK-8836: --- Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/5880 +1, Will this PR also get into 1.4.x? > Duplicating a KryoSerializer does not duplicate registered default serializers > -- > > Key: FLINK-8836 > URL: https://issues.apache.org/jira/browse/FLINK-8836 > Project: Flink > Issue Type: Bug > Components: Type Serialization System >Reporter: Tzu-Li (Gordon) Tai >Assignee: Stefan Richter >Priority: Blocker > Fix For: 1.5.0 > > > The {{duplicate()}} method of the {{KryoSerializer}} is as following: > {code:java} > public KryoSerializer duplicate() { > return new KryoSerializer<>(this); > } > protected KryoSerializer(KryoSerializer toCopy) { > defaultSerializers = toCopy.defaultSerializers; > defaultSerializerClasses = toCopy.defaultSerializerClasses; > kryoRegistrations = toCopy.kryoRegistrations; > ... > } > {code} > Shortly put, when duplicating a {{KryoSerializer}}, the > {{defaultSerializers}} serializer instances are directly provided to the new > {{KryoSerializer}} instance. > This causes the fact that those default serializers are shared across two > different {{KryoSerializer}} instances, and therefore not a correct duplicate. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5880: [FLINK-8836] Fix duplicate method in KryoSerializer to pe...
Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/5880 +1, Will this PR also get into 1.4.x? ---
[jira] [Commented] (FLINK-9190) YarnResourceManager sometimes does not request new Containers
[ https://issues.apache.org/jira/browse/FLINK-9190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16445315#comment-16445315 ] ASF GitHub Bot commented on FLINK-9190: --- GitHub user sihuazhou opened a pull request: https://github.com/apache/flink/pull/5881 [FLINK-9190][yarn] fix YarnResourceManager sometimes does not request new Containers ## What is the purpose of the change This PR fixes the problem that `YarnResourceManager` does not request new Containers when container were killed without registering with `ResourceManager`. ## Brief change log - *fix YarnResourceManager sometimes does not request new Containers* ## Verifying this change - *add unit test to `YarnResourceManagerTest#testKillContainerBeforeTMRegisterSuccessfully()` verify this* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (*yes*) - The S3 file system connector: (no) ## Documentation no You can merge this pull request into a Git repository by running: $ git pull https://github.com/sihuazhou/flink fixYarnResourceManagerRequestContainers Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5881.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5881 commit bbf03ca7fc709e11627560466bff01b9e750bbd2 Author: sihuazhouDate: 2018-04-20T05:02:28Z fix YarnResourceManager sometimes does not request new Containers > YarnResourceManager sometimes does not request new Containers > - > > Key: FLINK-9190 > URL: https://issues.apache.org/jira/browse/FLINK-9190 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, YARN >Affects Versions: 1.5.0 > Environment: Hadoop 2.8.3 > ZooKeeper 3.4.5 > Flink 71c3cd2781d36e0a03d022a38cc4503d343f7ff8 >Reporter: Gary Yao >Assignee: Sihua Zhou >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > Attachments: yarn-logs > > > *Description* > The {{YarnResourceManager}} does not request new containers if > {{TaskManagers}} are killed rapidly in succession. After 5 minutes the job is > restarted due to {{NoResourceAvailableException}}, and the job runs normally > afterwards. I suspect that {{TaskManager}} failures are not registered if the > failure occurs before the {{TaskManager}} registers with the master. Logs are > attached; I added additional log statements to > {{YarnResourceManager.onContainersCompleted}} and > {{YarnResourceManager.onContainersAllocated}}. > *Expected Behavior* > The {{YarnResourceManager}} should recognize that the container is completed > and keep requesting new containers. The job should run as soon as resources > are available. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5881: [FLINK-9190][yarn] fix YarnResourceManager sometim...
GitHub user sihuazhou opened a pull request: https://github.com/apache/flink/pull/5881 [FLINK-9190][yarn] fix YarnResourceManager sometimes does not request new Containers ## What is the purpose of the change This PR fixes the problem that `YarnResourceManager` does not request new Containers when container were killed without registering with `ResourceManager`. ## Brief change log - *fix YarnResourceManager sometimes does not request new Containers* ## Verifying this change - *add unit test to `YarnResourceManagerTest#testKillContainerBeforeTMRegisterSuccessfully()` verify this* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (*yes*) - The S3 file system connector: (no) ## Documentation no You can merge this pull request into a Git repository by running: $ git pull https://github.com/sihuazhou/flink fixYarnResourceManagerRequestContainers Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5881.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5881 commit bbf03ca7fc709e11627560466bff01b9e750bbd2 Author: sihuazhouDate: 2018-04-20T05:02:28Z fix YarnResourceManager sometimes does not request new Containers ---
[jira] [Assigned] (FLINK-9190) YarnResourceManager sometimes does not request new Containers
[ https://issues.apache.org/jira/browse/FLINK-9190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sihua Zhou reassigned FLINK-9190: - Assignee: Sihua Zhou > YarnResourceManager sometimes does not request new Containers > - > > Key: FLINK-9190 > URL: https://issues.apache.org/jira/browse/FLINK-9190 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, YARN >Affects Versions: 1.5.0 > Environment: Hadoop 2.8.3 > ZooKeeper 3.4.5 > Flink 71c3cd2781d36e0a03d022a38cc4503d343f7ff8 >Reporter: Gary Yao >Assignee: Sihua Zhou >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > Attachments: yarn-logs > > > *Description* > The {{YarnResourceManager}} does not request new containers if > {{TaskManagers}} are killed rapidly in succession. After 5 minutes the job is > restarted due to {{NoResourceAvailableException}}, and the job runs normally > afterwards. I suspect that {{TaskManager}} failures are not registered if the > failure occurs before the {{TaskManager}} registers with the master. Logs are > attached; I added additional log statements to > {{YarnResourceManager.onContainersCompleted}} and > {{YarnResourceManager.onContainersAllocated}}. > *Expected Behavior* > The {{YarnResourceManager}} should recognize that the container is completed > and keep requesting new containers. The job should run as soon as resources > are available. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8302) Support shift_left and shift_right in TableAPI
[ https://issues.apache.org/jira/browse/FLINK-8302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16445084#comment-16445084 ] ASF GitHub Bot commented on FLINK-8302: --- Github user dubin555 closed the pull request at: https://github.com/apache/flink/pull/5202 > Support shift_left and shift_right in TableAPI > -- > > Key: FLINK-8302 > URL: https://issues.apache.org/jira/browse/FLINK-8302 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Reporter: DuBin >Priority: Major > Labels: features > Fix For: 1.5.0 > > Original Estimate: 48h > Remaining Estimate: 48h > > Add shift_left and shift_right support in TableAPI, shift_left(input, n) act > as input << n, shift_right(input, n) act as input >> n. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-9223) bufferConsumers should be closed in SpillableSubpartitionTest#testConsumeSpilledPartitionSpilledBeforeAdd
[ https://issues.apache.org/jira/browse/FLINK-9223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang reassigned FLINK-9223: --- Assignee: vinoyang > bufferConsumers should be closed in > SpillableSubpartitionTest#testConsumeSpilledPartitionSpilledBeforeAdd > - > > Key: FLINK-9223 > URL: https://issues.apache.org/jira/browse/FLINK-9223 > Project: Flink > Issue Type: Test >Reporter: Ted Yu >Assignee: vinoyang >Priority: Minor > > {code} > BufferConsumer[] bufferConsumers = Arrays.stream(bufferBuilders).map( > BufferBuilder::createBufferConsumer > ).toArray(BufferConsumer[]::new); > {code} > After operation on bufferConsumers is done, the BufferConsumer's in the array > should be closed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5202: [FLINK-8302][table]Add SHIFT_LEFT and SHIFT_RIGHT ...
Github user dubin555 closed the pull request at: https://github.com/apache/flink/pull/5202 ---
[GitHub] flink issue #5845: [FLINK-9168][flink-connectors]Pulsar Sink connector
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5845 Welcome to the community! @sijie I think for this a general discussion email thread will be enough. ---
[jira] [Commented] (FLINK-9168) Pulsar Sink Connector
[ https://issues.apache.org/jira/browse/FLINK-9168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444501#comment-16444501 ] ASF GitHub Bot commented on FLINK-9168: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5845 Welcome to the community! @sijie I think for this a general discussion email thread will be enough. > Pulsar Sink Connector > - > > Key: FLINK-9168 > URL: https://issues.apache.org/jira/browse/FLINK-9168 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Affects Versions: 1.6.0 >Reporter: Zongyang Xiao >Priority: Minor > Fix For: 1.6.0 > > > Flink does not provide a sink connector for Pulsar. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9210) Call getValue() only once in gauge serialization
[ https://issues.apache.org/jira/browse/FLINK-9210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1658#comment-1658 ] ASF GitHub Bot commented on FLINK-9210: --- Github user Silven1117 commented on the issue: https://github.com/apache/flink/pull/5875 Thanks that would be great! Will assign myself next time I create issues. > Call getValue() only once in gauge serialization > > > Key: FLINK-9210 > URL: https://issues.apache.org/jira/browse/FLINK-9210 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.4.2 >Reporter: Jisu You >Assignee: Chesnay Schepler >Priority: Minor > Fix For: 1.5.0, 1.4.3 > > > MetricDumpSerialization.serializeGauge() calls gauge.getValue twice in gauge > serialization. This is troublesome for those who reset gauges in getValue. > serializeGauge() should only call getValue once. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5875: [FLINK-9210] Removed unnecessary getValue calls in serial...
Github user Silven1117 commented on the issue: https://github.com/apache/flink/pull/5875 Thanks that would be great! Will assign myself next time I create issues. ---
[jira] [Commented] (FLINK-9168) Pulsar Sink Connector
[ https://issues.apache.org/jira/browse/FLINK-9168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1618#comment-1618 ] ASF GitHub Bot commented on FLINK-9168: --- Github user sijie commented on the issue: https://github.com/apache/flink/pull/5845 @tzulitai thank you for your comments. Glad to hear your opinions about pulsar connectors. I was the original person who initiated the idea of flink pulsar connectors with @XiaoZYang, I am also from Pulsar IPMC. Although Pulsar is a young project, it is a very active developing project. We have committers from various companies and pretty good adoption. from pulsar community perspective, we are very happy committed to developing/maintaining pulsar connectors. hope this can help clear some of your concerns. As the next step, I am happy to start the email thread at flink mailing list. should this be a FLIP? or just an general discussion email thread? > Pulsar Sink Connector > - > > Key: FLINK-9168 > URL: https://issues.apache.org/jira/browse/FLINK-9168 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Affects Versions: 1.6.0 >Reporter: Zongyang Xiao >Priority: Minor > Fix For: 1.6.0 > > > Flink does not provide a sink connector for Pulsar. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5845: [FLINK-9168][flink-connectors]Pulsar Sink connector
Github user sijie commented on the issue: https://github.com/apache/flink/pull/5845 @tzulitai thank you for your comments. Glad to hear your opinions about pulsar connectors. I was the original person who initiated the idea of flink pulsar connectors with @XiaoZYang, I am also from Pulsar IPMC. Although Pulsar is a young project, it is a very active developing project. We have committers from various companies and pretty good adoption. from pulsar community perspective, we are very happy committed to developing/maintaining pulsar connectors. hope this can help clear some of your concerns. As the next step, I am happy to start the email thread at flink mailing list. should this be a FLIP? or just an general discussion email thread? ---
[jira] [Commented] (FLINK-8689) Add runtime support of distinct filter using MapView
[ https://issues.apache.org/jira/browse/FLINK-8689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444399#comment-16444399 ] ASF GitHub Bot commented on FLINK-8689: --- Github user walterddr commented on the issue: https://github.com/apache/flink/pull/ Hi @fhueske . Thanks for the review, all very good points. I will follow up with the next steps. Actually @hequn8128 and I had some discussions regarding the follow up in FLINK-8690 already and I created 2 different approaches. Please kindly take a look when you have time :-) Best, Rong > Add runtime support of distinct filter using MapView > - > > Key: FLINK-8689 > URL: https://issues.apache.org/jira/browse/FLINK-8689 > Project: Flink > Issue Type: Sub-task >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > This ticket should cover distinct aggregate function support to codegen for > *AggregateCall*, where *isDistinct* fields is set to true. > This can be verified using the following SQL, which is not currently > producing correct results. > {code:java} > SELECT > a, > SUM(b) OVER (PARTITION BY a ORDER BY proctime ROWS BETWEEN 5 PRECEDING AND > CURRENT ROW) > FROM > MyTable{code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5555: [FLINK-8689][table]Add runtime support of distinct filter...
Github user walterddr commented on the issue: https://github.com/apache/flink/pull/ Hi @fhueske . Thanks for the review, all very good points. I will follow up with the next steps. Actually @hequn8128 and I had some discussions regarding the follow up in FLINK-8690 already and I created 2 different approaches. Please kindly take a look when you have time :-) Best, Rong ---
[jira] [Created] (FLINK-9223) bufferConsumers should be closed in SpillableSubpartitionTest#testConsumeSpilledPartitionSpilledBeforeAdd
Ted Yu created FLINK-9223: - Summary: bufferConsumers should be closed in SpillableSubpartitionTest#testConsumeSpilledPartitionSpilledBeforeAdd Key: FLINK-9223 URL: https://issues.apache.org/jira/browse/FLINK-9223 Project: Flink Issue Type: Test Reporter: Ted Yu {code} BufferConsumer[] bufferConsumers = Arrays.stream(bufferBuilders).map( BufferBuilder::createBufferConsumer ).toArray(BufferConsumer[]::new); {code} After operation on bufferConsumers is done, the BufferConsumer's in the array should be closed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-2685) TaskManager deadlock on NetworkBufferPool
[ https://issues.apache.org/jira/browse/FLINK-2685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444342#comment-16444342 ] Amit Jain edited comment on FLINK-2685 at 4/19/18 4:42 PM: --- [~NicoK] I've checked in WebUI and found there is no progress at all. This issue is coming up randomly, we have also observed that there are cases where few jobs hardly need to work with few MB of data and still hung up. {noformat} { "nodes": [ { "id": 5, "type": "source", "pact": "Data Source", "contents": "at createInput(ExecutionEnvironment.java:553) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)", "parallelism": "1", "global_properties": [ { "name": "Partitioning", "value": "RANDOM_PARTITIONED" }, { "name": "Partitioning Order", "value": "(none)" }, { "name": "Uniqueness", "value": "not unique" } ], "local_properties": [ { "name": "Order", "value": "(none)" }, { "name": "Grouping", "value": "not grouped" }, { "name": "Uniqueness", "value": "not unique" } ], "estimates": [ { "name": "Est. Output Size", "value": "(unknown)" }, { "name": "Est. Cardinality", "value": "(unknown)" }], "costs": [ { "name": "Network", "value": "0.0" }, { "name": "Disk I/O", "value": "0.0" }, { "name": "CPU", "value": "0.0" }, { "name": "Cumulative Network", "value": "0.0" }, { "name": "Cumulative Disk I/O", "value": "0.0" }, { "name": "Cumulative CPU", "value": "0.0" } ], "compiler_hints": [ { "name": "Output Size (bytes)", "value": "(none)" }, { "name": "Output Cardinality", "value": "(none)" }, { "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, { "name": "Filter Factor", "value": "(none)" } ] }, { "id": 4, "type": "pact", "pact": "Map", "contents": "Data Source Parquet s3a://limeroad-logs/emr-testing/ldp_test/mongo/mongo_userDB.loves_loves/main/1524148386302/", "parallelism": "1", "predecessors": [ {"id": 5, "ship_strategy": "Forward", "exchange_mode": "PIPELINED"} ], "driver_strategy": "Map", "global_properties": [ { "name": "Partitioning", "value": "RANDOM_PARTITIONED" }, { "name": "Partitioning Order", "value": "(none)" }, { "name": "Uniqueness", "value": "not unique" } ], "local_properties": [ { "name": "Order", "value": "(none)" }, { "name": "Grouping", "value": "not grouped" }, { "name": "Uniqueness", "value": "not unique" } ], "estimates": [ { "name": "Est. Output Size", "value": "(unknown)" }, { "name": "Est. Cardinality", "value": "(unknown)" }], "costs": [ { "name": "Network", "value": "0.0" }, { "name": "Disk I/O", "value": "0.0" }, { "name": "CPU", "value": "0.0" }, { "name": "Cumulative Network", "value": "0.0" }, { "name": "Cumulative Disk I/O", "value": "0.0" }, { "name": "Cumulative CPU", "value": "0.0" } ], "compiler_hints": [ { "name": "Output Size (bytes)", "value": "(none)" }, { "name": "Output Cardinality", "value": "(none)" }, { "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, { "name": "Filter Factor", "value": "(none)" } ] }, { "id": 3, "type": "pact", "pact": "Map", "contents": "Key Extractor", "parallelism": "1", "predecessors": [ {"id": 4, "ship_strategy": "Forward", "exchange_mode": "PIPELINED"} ], "driver_strategy": "Map", "global_properties": [ { "name": "Partitioning", "value": "RANDOM_PARTITIONED" }, { "name": "Partitioning Order", "value": "(none)" }, { "name": "Uniqueness", "value": "not unique" } ], "local_properties": [ { "name": "Order", "value": "(none)" }, { "name": "Grouping", "value": "not grouped" }, { "name": "Uniqueness", "value": "not unique" } ], "estimates": [ { "name": "Est. Output Size", "value": "(unknown)" }, { "name": "Est. Cardinality", "value": "(unknown)" }], "costs": [ { "name": "Network", "value": "0.0" }, { "name": "Disk I/O", "value": "0.0" }, { "name": "CPU", "value": "0.0" }, { "name": "Cumulative Network", "value": "0.0" }, { "name": "Cumulative Disk I/O", "value": "0.0" }, { "name": "Cumulative CPU", "value": "0.0" } ], "compiler_hints": [ { "name": "Output Size (bytes)", "value": "(none)" }, { "name": "Output Cardinality", "value": "(none)" }, { "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, { "name": "Filter Factor", "value": "(none)" } ] }, { "id": 11, "type": "source", "pact": "Data Source", "contents": "at createInput(ExecutionEnvironment.java:553) (org.apache.flink.api.java.io.TextInputFormat)", "parallelism": "1", "global_properties": [ { "name": "Partitioning", "value": "RANDOM_PARTITIONED" }, { "name": "Partitioning Order", "value": "(none)" }, { "name": "Uniqueness", "value": "not unique" } ], "local_properties": [ { "name": "Order", "value": "(none)" }, { "name": "Grouping", "value": "not grouped" }, { "name": "Uniqueness", "value": "not unique" } ], "estimates": [ { "name": "Est. Output Size", "value": "(unknown)" }, { "name": "Est. Cardinality", "value": "(unknown)" }], "costs": [ { "name": "Network", "value": "0.0" }, { "name": "Disk I/O", "value": "0.0" }, { "name": "CPU", "value":
[jira] [Closed] (FLINK-9206) CheckpointCoordinator log messages do not contain the job ID
[ https://issues.apache.org/jira/browse/FLINK-9206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-9206. --- Resolution: Fixed master: 727370aacf63cefc6aed7c46dc2d63517e4b708d 1.5: 71c2ac313315cb6b2b5d269041788b9907e07de1 1.4: e3433d9300ad4ae35693ff76c2de987a37724c2f > CheckpointCoordinator log messages do not contain the job ID > > > Key: FLINK-9206 > URL: https://issues.apache.org/jira/browse/FLINK-9206 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.4.0, 1.5.0, 1.4.1, 1.4.2, 1.6.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Fix For: 1.5.0, 1.4.3 > > > The {{CheckpointCoordinator}} exists per job but several of its log messages > do not contain the job ID and thus if multiple jobs exist, we could not track > which log message belongs to which job. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-2685) TaskManager deadlock on NetworkBufferPool
[ https://issues.apache.org/jira/browse/FLINK-2685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444342#comment-16444342 ] Amit Jain commented on FLINK-2685: -- [~NicoK] I've checked in WebUI and found there is no progress at all. This issue is coming up randomly, we have also observed that there are cases where few jobs hardly need to work with few MB of data and still hung up. > TaskManager deadlock on NetworkBufferPool > - > > Key: FLINK-2685 > URL: https://issues.apache.org/jira/browse/FLINK-2685 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, Network >Affects Versions: 0.10.0 >Reporter: Greg Hogan >Assignee: Ufuk Celebi >Priority: Major > Attachments: job_manager_19_feb_15_30_running, > task_manager_19_feb_15_30_running > > > This deadlock occurs intermittently. I have a {{join}} followed by a > {{chain}} followed by a {{reduceGroup}}. Stack traces and local > variables from one each of the {{join}} threads below. > The {{join}}'s are waiting on a buffer to become available > ({{networkBufferPool.availableMemorySegments.count=0}}). Both > {{LocalBufferPool}}'s have been given extra capacity ({{currentPoolSize=60 > > numberOfRequiredMemorySegments=32}}). The first {{join}} is at full capacity > ({{currentPoolSize=numberOfRequestedMemorySegments=60}}) yet the second > {{join}} has not acquired any ({{numberOfRequestedMemorySegments=0}}). > {{LocalBufferPool.returnExcessMemorySegments}} only recycles > {{MemorySegment}}'s from its {{availableMemorySegments}}, so any requested > {{Buffer}}'s will only be released when explicitly recycled. > First join stack trace and variable values from > {{LocalBufferPool.requestBuffer}}: > {noformat} > owns: SpanningRecordSerializer (id=723) > waiting for: ArrayDeque (id=724) > Object.wait(long) line: not available [native method] > LocalBufferPool.requestBuffer(boolean) line: 163 > LocalBufferPool.requestBufferBlocking() line: 133 > RecordWriter.emit(T) line: 92 > OutputCollector.collect(T) line: 65 > JoinOperator$ProjectFlatJoinFunction .join(T1, T2, Collector) > line: 1088 > ReusingBuildSecondHashMatchIterator .callWithNextKey(FlatJoinFunction , > Collector) line: 137 > JoinDriver .run() line: 208 > RegularPactTask .run() line: 489 > RegularPactTask.invoke() line: 354 > Task.run() line: 581 > Thread.run() line: 745 > {noformat} > {noformat} > this LocalBufferPool (id=403) > availableMemorySegments ArrayDeque (id=398) > elementsObject[16] (id=422) > head14 > tail14 > currentPoolSize 60 > isDestroyed false > networkBufferPool NetworkBufferPool (id=354) > allBufferPools HashSet (id=424) > availableMemorySegments ArrayBlockingQueue (id=427) > count 0 > items Object[10240] (id=674) > itrsnull > lockReentrantLock (id=675) > notEmpty > AbstractQueuedSynchronizer$ConditionObject (id=678) > notFull AbstractQueuedSynchronizer$ConditionObject > (id=679) > putIndex6954 > takeIndex 6954 > factoryLock Object (id=430) > isDestroyed false > managedBufferPools HashSet (id=431) > memorySegmentSize 32768 > numTotalRequiredBuffers 3226 > totalNumberOfMemorySegments 10240 > numberOfRequestedMemorySegments 60 > numberOfRequiredMemorySegments 32 > owner null > registeredListeners ArrayDeque (id=421) > elementsObject[16] (id=685) > head0 > tail0 > askToRecycle false > isBlockingtrue > {noformat} > Second join stack trace and variable values from > {{SingleInputGate.getNextBufferOrEvent}}: > {noformat} > Unsafe.park(boolean, long) line: not available [native method] > LockSupport.parkNanos(Object, long) line: 215 > AbstractQueuedSynchronizer$ConditionObject.awaitNanos(long) line: 2078 > LinkedBlockingQueue.poll(long, TimeUnit) line: 467 > SingleInputGate.getNextBufferOrEvent() line: 414 > MutableRecordReader(AbstractRecordReader).getNextRecord(T) line: 79 > MutableRecordReader.next(T) line: 34 > ReaderIterator.next(T) line: 59 > MutableHashTable$ProbeIterator.next() line: 1581 >
[jira] [Assigned] (FLINK-8715) RocksDB does not propagate reconfiguration of serializer to the states
[ https://issues.apache.org/jira/browse/FLINK-8715?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai reassigned FLINK-8715: -- Assignee: Tzu-Li (Gordon) Tai > RocksDB does not propagate reconfiguration of serializer to the states > -- > > Key: FLINK-8715 > URL: https://issues.apache.org/jira/browse/FLINK-8715 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.2 >Reporter: Arvid Heise >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.5.0 > > > Any changes to the serializer done in #ensureCompability are lost during the > state creation. > In particular, > [https://github.com/apache/flink/blob/master/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java#L68] > always uses a fresh copy of the StateDescriptor. > An easy fix is to pass the reconfigured serializer as an additional parameter > in > [https://github.com/apache/flink/blob/master/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java#L1681] > , which can be retrieved through the side-output of getColumnFamily > {code:java} > kvStateInformation.get(stateDesc.getName()).f1.getStateSerializer() > {code} > I encountered it in 1.3.2 but the code in the master seems unchanged (hence > the pointer into master). I encountered it in ValueState, but I suspect the > same issue can be observed for all kinds of RocksDB states. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9207) Client returns SUCCESS(0) return code for canceled job
[ https://issues.apache.org/jira/browse/FLINK-9207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444306#comment-16444306 ] Amit Jain commented on FLINK-9207: -- [~aljoscha] I believe, many of us submit Batch job via some scheduler and CLI job executor expect the error in the mentioned case. I understand there exist other ways to achieve the same also. We are currently submitting our jobs in non-detached mode and found this problem. [~gjy] My intention was to make you guys aware of the issue. I would like to have it as Blocker as our current setup expect this. Please feel free to set it up according to the merit of the problem. > Client returns SUCCESS(0) return code for canceled job > -- > > Key: FLINK-9207 > URL: https://issues.apache.org/jira/browse/FLINK-9207 > Project: Flink > Issue Type: Bug > Components: Client >Affects Versions: 1.5.0 > Environment: Version: 1.5.0, Commit : 2af481a >Reporter: Amit Jain >Priority: Minor > Fix For: 1.5.0 > > > Flink Client returns zero return code when a job is deliberately canceled. > Steps to reproduced it: > 1. bin/flink run -p 10 -m yarn-cluster -yjm 1024 -ytm 12288 WordCount.jar > 2. User externally canceled the job. > 3. Job Manager marked the job as CANCELED. > 4. Although client code emits following logs, still returns zero return code. > org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Killed > application application_1523726493647_. > Job scheduler like Airflow would have hard-time detecting whether the > submitted job was canceled or not. > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9189) Add a SBT Quickstart
[ https://issues.apache.org/jira/browse/FLINK-9189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nico Kruber updated FLINK-9189: --- Summary: Add a SBT Quickstart (was: Add a SBT Quickstarts) > Add a SBT Quickstart > > > Key: FLINK-9189 > URL: https://issues.apache.org/jira/browse/FLINK-9189 > Project: Flink > Issue Type: Improvement > Components: Quickstarts >Reporter: Stephan Ewen >Priority: Critical > > Having a proper project template helps a lot in getting dependencies right. > For example, setting the core dependencies to "provided", the connector / > library dependencies to "compile", etc. > The Maven quickstarts are in good shape by now, but I observed SBT users to > get this wrong quite often. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9222) Add a Gradle Quickstart
[ https://issues.apache.org/jira/browse/FLINK-9222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444291#comment-16444291 ] Nico Kruber commented on FLINK-9222: actually, what is wrong with the existing SBT quickstarts at https://ci.apache.org/projects/flink/flink-docs-master/quickstart/scala_api_quickstart.html#sbt ? > Add a Gradle Quickstart > --- > > Key: FLINK-9222 > URL: https://issues.apache.org/jira/browse/FLINK-9222 > Project: Flink > Issue Type: Improvement > Components: Quickstarts >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Critical > > Having a proper project template helps a lot in getting dependencies right. > For example, setting the core dependencies to "provided", the connector / > library dependencies to "compile", etc. > The Maven quickstarts are in good shape by now, but there is none for Gradle > and Gradle users to get this wrong quite often. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Issue Comment Deleted] (FLINK-9222) Add a Gradle Quickstart
[ https://issues.apache.org/jira/browse/FLINK-9222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nico Kruber updated FLINK-9222: --- Comment: was deleted (was: actually, what is wrong with the existing SBT quickstarts at https://ci.apache.org/projects/flink/flink-docs-master/quickstart/scala_api_quickstart.html#sbt ?) > Add a Gradle Quickstart > --- > > Key: FLINK-9222 > URL: https://issues.apache.org/jira/browse/FLINK-9222 > Project: Flink > Issue Type: Improvement > Components: Quickstarts >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Critical > > Having a proper project template helps a lot in getting dependencies right. > For example, setting the core dependencies to "provided", the connector / > library dependencies to "compile", etc. > The Maven quickstarts are in good shape by now, but there is none for Gradle > and Gradle users to get this wrong quite often. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9189) Add a SBT Quickstarts
[ https://issues.apache.org/jira/browse/FLINK-9189?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444292#comment-16444292 ] Nico Kruber commented on FLINK-9189: actually, what is wrong with the existing SBT quickstarts at https://ci.apache.org/projects/flink/flink-docs-master/quickstart/scala_api_quickstart.html#sbt ? > Add a SBT Quickstarts > - > > Key: FLINK-9189 > URL: https://issues.apache.org/jira/browse/FLINK-9189 > Project: Flink > Issue Type: Improvement > Components: Quickstarts >Reporter: Stephan Ewen >Priority: Critical > > Having a proper project template helps a lot in getting dependencies right. > For example, setting the core dependencies to "provided", the connector / > library dependencies to "compile", etc. > The Maven quickstarts are in good shape by now, but I observed SBT users to > get this wrong quite often. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9189) Add a SBT Quickstarts
[ https://issues.apache.org/jira/browse/FLINK-9189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nico Kruber updated FLINK-9189: --- Summary: Add a SBT Quickstarts (was: Add a SBT and Gradle Quickstarts) > Add a SBT Quickstarts > - > > Key: FLINK-9189 > URL: https://issues.apache.org/jira/browse/FLINK-9189 > Project: Flink > Issue Type: Improvement > Components: Quickstarts >Reporter: Stephan Ewen >Priority: Critical > > Having a proper project template helps a lot in getting dependencies right. > For example, setting the core dependencies to "provided", the connector / > library dependencies to "compile", etc. > The Maven quickstarts are in good shape by now, but I observed SBT and Gradle > users to get this wrong quite often. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9189) Add a SBT Quickstarts
[ https://issues.apache.org/jira/browse/FLINK-9189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nico Kruber updated FLINK-9189: --- Description: Having a proper project template helps a lot in getting dependencies right. For example, setting the core dependencies to "provided", the connector / library dependencies to "compile", etc. The Maven quickstarts are in good shape by now, but I observed SBT users to get this wrong quite often. was: Having a proper project template helps a lot in getting dependencies right. For example, setting the core dependencies to "provided", the connector / library dependencies to "compile", etc. The Maven quickstarts are in good shape by now, but I observed SBT and Gradle users to get this wrong quite often. > Add a SBT Quickstarts > - > > Key: FLINK-9189 > URL: https://issues.apache.org/jira/browse/FLINK-9189 > Project: Flink > Issue Type: Improvement > Components: Quickstarts >Reporter: Stephan Ewen >Priority: Critical > > Having a proper project template helps a lot in getting dependencies right. > For example, setting the core dependencies to "provided", the connector / > library dependencies to "compile", etc. > The Maven quickstarts are in good shape by now, but I observed SBT users to > get this wrong quite often. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9222) Add a Gradle Quickstart
[ https://issues.apache.org/jira/browse/FLINK-9222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nico Kruber updated FLINK-9222: --- Description: Having a proper project template helps a lot in getting dependencies right. For example, setting the core dependencies to "provided", the connector / library dependencies to "compile", etc. The Maven quickstarts are in good shape by now, but there is none for Gradle and Gradle users to get this wrong quite often. was: Having a proper project template helps a lot in getting dependencies right. For example, setting the core dependencies to "provided", the connector / library dependencies to "compile", etc. The Maven quickstarts are in good shape by now, but I observed SBT and Gradle users to get this wrong quite often. > Add a Gradle Quickstart > --- > > Key: FLINK-9222 > URL: https://issues.apache.org/jira/browse/FLINK-9222 > Project: Flink > Issue Type: Improvement > Components: Quickstarts >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Critical > > Having a proper project template helps a lot in getting dependencies right. > For example, setting the core dependencies to "provided", the connector / > library dependencies to "compile", etc. > The Maven quickstarts are in good shape by now, but there is none for Gradle > and Gradle users to get this wrong quite often. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9222) Add a Gradle Quickstart
Nico Kruber created FLINK-9222: -- Summary: Add a Gradle Quickstart Key: FLINK-9222 URL: https://issues.apache.org/jira/browse/FLINK-9222 Project: Flink Issue Type: Improvement Components: Quickstarts Reporter: Nico Kruber Having a proper project template helps a lot in getting dependencies right. For example, setting the core dependencies to "provided", the connector / library dependencies to "compile", etc. The Maven quickstarts are in good shape by now, but I observed SBT and Gradle users to get this wrong quite often. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-9222) Add a Gradle Quickstart
[ https://issues.apache.org/jira/browse/FLINK-9222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nico Kruber reassigned FLINK-9222: -- Assignee: Nico Kruber > Add a Gradle Quickstart > --- > > Key: FLINK-9222 > URL: https://issues.apache.org/jira/browse/FLINK-9222 > Project: Flink > Issue Type: Improvement > Components: Quickstarts >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Critical > > Having a proper project template helps a lot in getting dependencies right. > For example, setting the core dependencies to "provided", the connector / > library dependencies to "compile", etc. > The Maven quickstarts are in good shape by now, but I observed SBT and Gradle > users to get this wrong quite often. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9202) AvroSerializer should not be serializing the target Avro type class
[ https://issues.apache.org/jira/browse/FLINK-9202?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444261#comment-16444261 ] Timo Walther commented on FLINK-9202: - I could reproduce the exception {{"local class incompatible: stream classdesc serialVersionUID = -5332488931363852176, local class serialVersionUID = -8084632352057382365"}}. Will work on fix. > AvroSerializer should not be serializing the target Avro type class > --- > > Key: FLINK-9202 > URL: https://issues.apache.org/jira/browse/FLINK-9202 > Project: Flink > Issue Type: Bug > Components: Type Serialization System >Reporter: Tzu-Li (Gordon) Tai >Assignee: Timo Walther >Priority: Critical > > The {{AvroSerializer}} contains this field which is written when the > serializer is written into savepoints: > [https://github.com/apache/flink/blob/be7c89596a3b9cd8805a90aaf32336ec2759a1f7/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java#L78] > This causes Avro schema evolution to not work properly, because Avro > generated classes have non-fixed serialVersionUIDs. Once a new Avro class is > generated with a new schema, that class can not be loaded on restore due to > incompatible UIDs, and thus the serializer can not be successfully > deserialized. > A possible solution would be to only write the classname, and dynamically > load the class into a transient field. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9221) Add method SinkFunction[A]#contramap[B](f: B => A): SinkFunction[B]
[ https://issues.apache.org/jira/browse/FLINK-9221?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Lemer updated FLINK-9221: -- Component/s: DataSet API > Add method SinkFunction[A]#contramap[B](f: B => A): SinkFunction[B] > --- > > Key: FLINK-9221 > URL: https://issues.apache.org/jira/browse/FLINK-9221 > Project: Flink > Issue Type: Task > Components: DataSet API, DataStream API >Affects Versions: 1.5.0 >Reporter: Josh Lemer >Priority: Minor > Labels: flink > > Just like it is very useful to use `DataStream[T]` as a sort of Functor or > Monad with `map`/`flatMap`/`filter` methods, it would be extremely handy to > have a `SinkFunction[A]#contramap[B](f: B => A): SinkFunction[B]` on > `SinkFunctions`, so that you can reuse existing complex sink functions, but > with a different input type. For example: > {code} > val bucketingStringSink: SinkFunction[String] = > new BucketingSink[String]("...") > .setBucketer(new DateTimeBucketer("-MM-dd-HHmm") > val bucketingIntListSink: SinkFunction[List[Int]] = > bucketingStringSink.contramap[List[Int]](_.mkString(",")) > {code} > For some more formal motivation behind this, > https://typelevel.org/cats/typeclasses/contravariant.html is definitely a > great place to start! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9221) Add method SinkFunction[A]#contramap[B](f: B => A): SinkFunction[B]
[ https://issues.apache.org/jira/browse/FLINK-9221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444250#comment-16444250 ] Josh Lemer commented on FLINK-9221: --- Fixed! > Add method SinkFunction[A]#contramap[B](f: B => A): SinkFunction[B] > --- > > Key: FLINK-9221 > URL: https://issues.apache.org/jira/browse/FLINK-9221 > Project: Flink > Issue Type: Task > Components: DataStream API >Affects Versions: 1.5.0 >Reporter: Josh Lemer >Priority: Minor > Labels: flink > > Just like it is very useful to use `DataStream[T]` as a sort of Functor or > Monad with `map`/`flatMap`/`filter` methods, it would be extremely handy to > have a `SinkFunction[A]#contramap[B](f: B => A): SinkFunction[B]` on > `SinkFunctions`, so that you can reuse existing complex sink functions, but > with a different input type. For example: > {code} > val bucketingStringSink: SinkFunction[String] = > new BucketingSink[String]("...") > .setBucketer(new DateTimeBucketer("-MM-dd-HHmm") > val bucketingIntListSink: SinkFunction[List[Int]] = > bucketingStringSink.contramap[List[Int]](_.mkString(",")) > {code} > For some more formal motivation behind this, > https://typelevel.org/cats/typeclasses/contravariant.html is definitely a > great place to start! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9221) Add method SinkFunction[A]#contramap[B](f: B => A): SinkFunction[B]
[ https://issues.apache.org/jira/browse/FLINK-9221?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Lemer updated FLINK-9221: -- Labels: flink (was: ) > Add method SinkFunction[A]#contramap[B](f: B => A): SinkFunction[B] > --- > > Key: FLINK-9221 > URL: https://issues.apache.org/jira/browse/FLINK-9221 > Project: Flink > Issue Type: Task > Components: DataStream API >Affects Versions: 1.5.0 >Reporter: Josh Lemer >Priority: Minor > Labels: flink > > Just like it is very useful to use `DataStream[T]` as a sort of Functor or > Monad with `map`/`flatMap`/`filter` methods, it would be extremely handy to > have a `SinkFunction[A]#contramap[B](f: B => A): SinkFunction[B]` on > `SinkFunctions`, so that you can reuse existing complex sink functions, but > with a different input type. For example: > {code} > val bucketingStringSink: SinkFunction[String] = > new BucketingSink[String]("...") > .setBucketer(new DateTimeBucketer("-MM-dd-HHmm") > val bucketingIntListSink: SinkFunction[List[Int]] = > bucketingStringSink.contramap[List[Int]](_.mkString(",")) > {code} > For some more formal motivation behind this, > https://typelevel.org/cats/typeclasses/contravariant.html is definitely a > great place to start! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9221) Add method SinkFunction[A]#contramap[B](f: B => A): SinkFunction[B]
[ https://issues.apache.org/jira/browse/FLINK-9221?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Lemer updated FLINK-9221: -- Affects Version/s: 1.5.0 > Add method SinkFunction[A]#contramap[B](f: B => A): SinkFunction[B] > --- > > Key: FLINK-9221 > URL: https://issues.apache.org/jira/browse/FLINK-9221 > Project: Flink > Issue Type: Task > Components: DataStream API >Affects Versions: 1.5.0 >Reporter: Josh Lemer >Priority: Minor > Labels: flink > > Just like it is very useful to use `DataStream[T]` as a sort of Functor or > Monad with `map`/`flatMap`/`filter` methods, it would be extremely handy to > have a `SinkFunction[A]#contramap[B](f: B => A): SinkFunction[B]` on > `SinkFunctions`, so that you can reuse existing complex sink functions, but > with a different input type. For example: > {code} > val bucketingStringSink: SinkFunction[String] = > new BucketingSink[String]("...") > .setBucketer(new DateTimeBucketer("-MM-dd-HHmm") > val bucketingIntListSink: SinkFunction[List[Int]] = > bucketingStringSink.contramap[List[Int]](_.mkString(",")) > {code} > For some more formal motivation behind this, > https://typelevel.org/cats/typeclasses/contravariant.html is definitely a > great place to start! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9207) Client returns SUCCESS(0) return code for canceled job
[ https://issues.apache.org/jira/browse/FLINK-9207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444252#comment-16444252 ] Gary Yao commented on FLINK-9207: - [~amit.jain] Is the priority only _minor_ for you? > Client returns SUCCESS(0) return code for canceled job > -- > > Key: FLINK-9207 > URL: https://issues.apache.org/jira/browse/FLINK-9207 > Project: Flink > Issue Type: Bug > Components: Client >Affects Versions: 1.5.0 > Environment: Version: 1.5.0, Commit : 2af481a >Reporter: Amit Jain >Priority: Minor > Fix For: 1.5.0 > > > Flink Client returns zero return code when a job is deliberately canceled. > Steps to reproduced it: > 1. bin/flink run -p 10 -m yarn-cluster -yjm 1024 -ytm 12288 WordCount.jar > 2. User externally canceled the job. > 3. Job Manager marked the job as CANCELED. > 4. Although client code emits following logs, still returns zero return code. > org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Killed > application application_1523726493647_. > Job scheduler like Airflow would have hard-time detecting whether the > submitted job was canceled or not. > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9221) Add method SinkFunction[A]#contramap[B](f: B => A): SinkFunction[B]
[ https://issues.apache.org/jira/browse/FLINK-9221?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Lemer updated FLINK-9221: -- Component/s: DataStream API > Add method SinkFunction[A]#contramap[B](f: B => A): SinkFunction[B] > --- > > Key: FLINK-9221 > URL: https://issues.apache.org/jira/browse/FLINK-9221 > Project: Flink > Issue Type: Task > Components: DataStream API >Affects Versions: 1.5.0 >Reporter: Josh Lemer >Priority: Minor > Labels: flink > > Just like it is very useful to use `DataStream[T]` as a sort of Functor or > Monad with `map`/`flatMap`/`filter` methods, it would be extremely handy to > have a `SinkFunction[A]#contramap[B](f: B => A): SinkFunction[B]` on > `SinkFunctions`, so that you can reuse existing complex sink functions, but > with a different input type. For example: > {code} > val bucketingStringSink: SinkFunction[String] = > new BucketingSink[String]("...") > .setBucketer(new DateTimeBucketer("-MM-dd-HHmm") > val bucketingIntListSink: SinkFunction[List[Int]] = > bucketingStringSink.contramap[List[Int]](_.mkString(",")) > {code} > For some more formal motivation behind this, > https://typelevel.org/cats/typeclasses/contravariant.html is definitely a > great place to start! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9221) Add method SinkFunction[A]#contramap[B](f: B => A): SinkFunction[B]
[ https://issues.apache.org/jira/browse/FLINK-9221?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Lemer updated FLINK-9221: -- Description: Just like it is very useful to use `DataStream[T]` as a sort of Functor or Monad with `map`/`flatMap`/`filter` methods, it would be extremely handy to have a `SinkFunction[A]#contramap[B](f: B => A): SinkFunction[B]` on `SinkFunctions`, so that you can reuse existing complex sink functions, but with a different input type. For example: {code} val bucketingStringSink: SinkFunction[String] = new BucketingSink[String]("...") .setBucketer(new DateTimeBucketer("-MM-dd-HHmm") val bucketingIntListSink: SinkFunction[List[Int]] = bucketingStringSink.contramap[List[Int]](_.mkString(",")) {code} For some more formal motivation behind this, https://typelevel.org/cats/typeclasses/contravariant.html is definitely a great place to start! was: Just like it is very useful to use `DataStream[T]` as a sort of Functor or Monad with `map`/`flatMap`/`filter` methods, it would be extremely handy to have a `SinkFunction[A]#contramap[B](f: B => A): SinkFunction[B]` on `SinkFunctions`, so that you can reuse existing complex sink functions, but with a different input type. For example: {code} val bucketingStringSink: SinkFunction[String] = new BucketingSink[String]("...") .setBucketr(new DateTimeBucketer("-MM-dd-HHmm") val bucketingIntListSink: SinkFunction[List[Int]] = bucketingStringSink.contramap[List[Int]](_.mkString(",")) {code} For some more formal motivation behind this, https://typelevel.org/cats/typeclasses/contravariant.html is definitely a great place to start! > Add method SinkFunction[A]#contramap[B](f: B => A): SinkFunction[B] > --- > > Key: FLINK-9221 > URL: https://issues.apache.org/jira/browse/FLINK-9221 > Project: Flink > Issue Type: Task >Reporter: Josh Lemer >Priority: Minor > > Just like it is very useful to use `DataStream[T]` as a sort of Functor or > Monad with `map`/`flatMap`/`filter` methods, it would be extremely handy to > have a `SinkFunction[A]#contramap[B](f: B => A): SinkFunction[B]` on > `SinkFunctions`, so that you can reuse existing complex sink functions, but > with a different input type. For example: > {code} > val bucketingStringSink: SinkFunction[String] = > new BucketingSink[String]("...") > .setBucketer(new DateTimeBucketer("-MM-dd-HHmm") > val bucketingIntListSink: SinkFunction[List[Int]] = > bucketingStringSink.contramap[List[Int]](_.mkString(",")) > {code} > For some more formal motivation behind this, > https://typelevel.org/cats/typeclasses/contravariant.html is definitely a > great place to start! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-9211) Job submission via REST/dashboard does not work on Kubernetes
[ https://issues.apache.org/jira/browse/FLINK-9211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek reassigned FLINK-9211: --- Assignee: Aljoscha Krettek > Job submission via REST/dashboard does not work on Kubernetes > - > > Key: FLINK-9211 > URL: https://issues.apache.org/jira/browse/FLINK-9211 > Project: Flink > Issue Type: Bug > Components: Client, Web Client >Affects Versions: 1.5.0 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.5.0 > > > When setting up a cluster on Kubernets according to the documentation it is > possible to upload jar files but when trying to execute them you get an > exception like this: > {code} > org.apache.flink.runtime.rest.handler.RestHandlerException: > org.apache.flink.runtime.client.JobSubmissionException: Failed to submit > JobGraph. > at > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$2(JarRunHandler.java:113) > at > java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) > at > java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > at > org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:196) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:603) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:563) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:214) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:120) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.util.concurrent.CompletionException: > org.apache.flink.runtime.client.JobSubmissionException: Failed to submit > JobGraph. > at > org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$5(RestClusterClient.java:356) > ... 17 more > Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to > submit JobGraph. > ... 18 more > Caused by: java.util.concurrent.CompletionException: > org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException: > connection timed out: flink-jobmanager/10.105.154.28:8081 > at > java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) > at > java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) > at > java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943) > at > java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) > ... 15 more > Caused by: > org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException: > connection timed out: flink-jobmanager/10.105.154.28:8081 > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:212) > ... 7 more > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9221) Add method SinkFunction[A]#contramap[B](f: B => A): SinkFunction[B]
[ https://issues.apache.org/jira/browse/FLINK-9221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444236#comment-16444236 ] Ted Yu commented on FLINK-9221: --- bq. .setBucketr Should be {{.setBucketer}} > Add method SinkFunction[A]#contramap[B](f: B => A): SinkFunction[B] > --- > > Key: FLINK-9221 > URL: https://issues.apache.org/jira/browse/FLINK-9221 > Project: Flink > Issue Type: Task >Reporter: Josh Lemer >Priority: Minor > > Just like it is very useful to use `DataStream[T]` as a sort of Functor or > Monad with `map`/`flatMap`/`filter` methods, it would be extremely handy to > have a `SinkFunction[A]#contramap[B](f: B => A): SinkFunction[B]` on > `SinkFunctions`, so that you can reuse existing complex sink functions, but > with a different input type. For example: > {code} > val bucketingStringSink: SinkFunction[String] = > new BucketingSink[String]("...") > .setBucketr(new DateTimeBucketer("-MM-dd-HHmm") > val bucketingIntListSink: SinkFunction[List[Int]] = > bucketingStringSink.contramap[List[Int]](_.mkString(",")) > {code} > For some more formal motivation behind this, > https://typelevel.org/cats/typeclasses/contravariant.html is definitely a > great place to start! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8689) Add runtime support of distinct filter using MapView
[ https://issues.apache.org/jira/browse/FLINK-8689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444215#comment-16444215 ] ASF GitHub Bot commented on FLINK-8689: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r182219690 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala --- @@ -50,6 +51,96 @@ class OverWindowITCase extends StreamingWithStateTestBase { (8L, 8, "Hello World"), (20L, 20, "Hello World")) + @Test + def testProcTimeDistinctBoundedPartitionedRowsOver(): Unit = { --- End diff -- I tested a few other queries with distinct aggregates. * Queries with non-windowed `DISTINCT` aggregations work, but they are they are translated without distinct aggregations. So they changes in this PR are not used. * Queries with `DISTINCT` aggregates and `TUMBLE` or `HOP` windows fail with strange exceptions. Did not look related to these changes, but would be good to check. We don't have to fix these things in this PR (unless it is broken by these changes). In general, I think it would be good to add unit tests for the `AggregationCodeGenerator`. We could generate `GeneratedAggregations` for different configurations, compile them, and check if the result is expected. Actually, we should have done that already. This should also work for state-backend backed views if we embed the test in a HarnessTest. > Add runtime support of distinct filter using MapView > - > > Key: FLINK-8689 > URL: https://issues.apache.org/jira/browse/FLINK-8689 > Project: Flink > Issue Type: Sub-task >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > This ticket should cover distinct aggregate function support to codegen for > *AggregateCall*, where *isDistinct* fields is set to true. > This can be verified using the following SQL, which is not currently > producing correct results. > {code:java} > SELECT > a, > SUM(b) OVER (PARTITION BY a ORDER BY proctime ROWS BETWEEN 5 PRECEDING AND > CURRENT ROW) > FROM > MyTable{code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8689) Add runtime support of distinct filter using MapView
[ https://issues.apache.org/jira/browse/FLINK-8689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444231#comment-16444231 ] ASF GitHub Bot commented on FLINK-8689: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r182479163 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -93,6 +97,8 @@ class AggregationCodeGenerator( aggregates: Array[AggregateFunction[_ <: Any, _ <: Any]], aggFields: Array[Array[Int]], aggMapping: Array[Int], + distinctAggs: Array[Seq[DataViewSpec[_]]], + isStateBackedDataViews: Boolean, --- End diff -- We should add a constructor check for `if (partialResults && isStateBackedDataViews)` and throw an exception if `true`. `partialResults` means that the `Row` with the accumulators has to be emitted which won't work well for state-backed distinct maps that are probably too big. > Add runtime support of distinct filter using MapView > - > > Key: FLINK-8689 > URL: https://issues.apache.org/jira/browse/FLINK-8689 > Project: Flink > Issue Type: Sub-task >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > This ticket should cover distinct aggregate function support to codegen for > *AggregateCall*, where *isDistinct* fields is set to true. > This can be verified using the following SQL, which is not currently > producing correct results. > {code:java} > SELECT > a, > SUM(b) OVER (PARTITION BY a ORDER BY proctime ROWS BETWEEN 5 PRECEDING AND > CURRENT ROW) > FROM > MyTable{code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8689) Add runtime support of distinct filter using MapView
[ https://issues.apache.org/jira/browse/FLINK-8689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444221#comment-16444221 ] ASF GitHub Bot commented on FLINK-8689: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r182478446 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -93,6 +97,8 @@ class AggregationCodeGenerator( aggregates: Array[AggregateFunction[_ <: Any, _ <: Any]], aggFields: Array[Array[Int]], aggMapping: Array[Int], + distinctAggs: Array[Seq[DataViewSpec[_]]], --- End diff -- I would make this an `Array[Boolean]` and rename to `isDistinctAgg`. We can build the `MapViewSpec`s in the method. We have all the information for that in the other parameters. > Add runtime support of distinct filter using MapView > - > > Key: FLINK-8689 > URL: https://issues.apache.org/jira/browse/FLINK-8689 > Project: Flink > Issue Type: Sub-task >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > This ticket should cover distinct aggregate function support to codegen for > *AggregateCall*, where *isDistinct* fields is set to true. > This can be verified using the following SQL, which is not currently > producing correct results. > {code:java} > SELECT > a, > SUM(b) OVER (PARTITION BY a ORDER BY proctime ROWS BETWEEN 5 PRECEDING AND > CURRENT ROW) > FROM > MyTable{code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8689) Add runtime support of distinct filter using MapView
[ https://issues.apache.org/jira/browse/FLINK-8689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444230#comment-16444230 ] ASF GitHub Bot commented on FLINK-8689: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r182773359 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -417,13 +530,26 @@ class AggregationCodeGenerator( .stripMargin val create: String = { for (i <- aggs.indices) yield { - j""" - |${accTypes(i)} acc$i = (${accTypes(i)}) ${aggs(i)}.createAccumulator(); - |${genDataViewFieldSetter(s"acc$i", i)} - |accs.setField( - | $i, - | acc$i);""" -.stripMargin + if (isDistinctAggs(i)) { +j""" + |${accTypes(i)} acc$i = (${accTypes(i)}) ${aggs(i)}.createAccumulator(); + |$distinctAccType distinctAcc$i = ($distinctAccType) new org.apache.flink.table. + |functions.aggfunctions.DistinctAccumulator(acc$i); + |${genDistinctDataViewFieldSetter(s"distinctAcc$i", i)} --- End diff -- I think this and the `genAccDataViewFieldSetter` call (both from here and the non-distinct case) can be removed. > Add runtime support of distinct filter using MapView > - > > Key: FLINK-8689 > URL: https://issues.apache.org/jira/browse/FLINK-8689 > Project: Flink > Issue Type: Sub-task >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > This ticket should cover distinct aggregate function support to codegen for > *AggregateCall*, where *isDistinct* fields is set to true. > This can be verified using the following SQL, which is not currently > producing correct results. > {code:java} > SELECT > a, > SUM(b) OVER (PARTITION BY a ORDER BY proctime ROWS BETWEEN 5 PRECEDING AND > CURRENT ROW) > FROM > MyTable{code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8689) Add runtime support of distinct filter using MapView
[ https://issues.apache.org/jira/browse/FLINK-8689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444223#comment-16444223 ] ASF GitHub Bot commented on FLINK-8689: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r182496636 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -1439,7 +1467,47 @@ object AggregateUtil { } } -(aggFieldIndexes, aggregates, accTypes, accSpecs) +// create distinct accumulator filter argument +val distinctAggs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size) + +aggregateCalls.zipWithIndex.foreach { + case (aggCall, index) => +if (aggCall.isDistinct) { + val argList: util.List[Integer] = aggCall.getArgList + // Only support single argument for distinct operation + if (argList.size() > 1) { +throw TableException( + "Cannot apply distinct filter on multiple input argument fields at this moment!") + } + val relDataType = aggregateInputType.getFieldList.get(argList.get(0)).getType + val fieldIndex = aggFieldIndexes(index)(0) + val mapViewTypeInfo = new MapViewTypeInfo( +FlinkTypeFactory.toTypeInfo(relDataType), BasicTypeInfo.INT_TYPE_INFO) + + distinctAggs(index) = Seq( --- End diff -- I would generate the `MapViewSpec` in the aggregation code generator > Add runtime support of distinct filter using MapView > - > > Key: FLINK-8689 > URL: https://issues.apache.org/jira/browse/FLINK-8689 > Project: Flink > Issue Type: Sub-task >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > This ticket should cover distinct aggregate function support to codegen for > *AggregateCall*, where *isDistinct* fields is set to true. > This can be verified using the following SQL, which is not currently > producing correct results. > {code:java} > SELECT > a, > SUM(b) OVER (PARTITION BY a ORDER BY proctime ROWS BETWEEN 5 PRECEDING AND > CURRENT ROW) > FROM > MyTable{code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9221) Add method SinkFunction[A]#contramap[B](f: B => A): SinkFunction[B]
Josh Lemer created FLINK-9221: - Summary: Add method SinkFunction[A]#contramap[B](f: B => A): SinkFunction[B] Key: FLINK-9221 URL: https://issues.apache.org/jira/browse/FLINK-9221 Project: Flink Issue Type: Task Reporter: Josh Lemer Just like it is very useful to use `DataStream[T]` as a sort of Functor or Monad with `map`/`flatMap`/`filter` methods, it would be extremely handy to have a `SinkFunction[A]#contramap[B](f: B => A): SinkFunction[B]` on `SinkFunctions`, so that you can reuse existing complex sink functions, but with a different input type. For example: {code} val bucketingStringSink: SinkFunction[String] = new BucketingSink[String]("...") .setBucketr(new DateTimeBucketer("-MM-dd-HHmm") val bucketingIntListSink: SinkFunction[List[Int]] = bucketingStringSink.contramap[List[Int]](_.mkString(",")) {code} For some more formal motivation behind this, https://typelevel.org/cats/typeclasses/contravariant.html is definitely a great place to start! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8689) Add runtime support of distinct filter using MapView
[ https://issues.apache.org/jira/browse/FLINK-8689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444216#comment-16444216 ] ASF GitHub Bot commented on FLINK-8689: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r182226785 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -1439,7 +1467,47 @@ object AggregateUtil { } } -(aggFieldIndexes, aggregates, accTypes, accSpecs) +// create distinct accumulator filter argument +val distinctAggs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size) + +aggregateCalls.zipWithIndex.foreach { + case (aggCall, index) => +if (aggCall.isDistinct) { + val argList: util.List[Integer] = aggCall.getArgList + // Only support single argument for distinct operation + if (argList.size() > 1) { +throw TableException( + "Cannot apply distinct filter on multiple input argument fields at this moment!") --- End diff -- Reword error message to `"DISTINCT aggregations with multiple parameters not fully supported yet."`. > Add runtime support of distinct filter using MapView > - > > Key: FLINK-8689 > URL: https://issues.apache.org/jira/browse/FLINK-8689 > Project: Flink > Issue Type: Sub-task >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > This ticket should cover distinct aggregate function support to codegen for > *AggregateCall*, where *isDistinct* fields is set to true. > This can be verified using the following SQL, which is not currently > producing correct results. > {code:java} > SELECT > a, > SUM(b) OVER (PARTITION BY a ORDER BY proctime ROWS BETWEEN 5 PRECEDING AND > CURRENT ROW) > FROM > MyTable{code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8689) Add runtime support of distinct filter using MapView
[ https://issues.apache.org/jira/browse/FLINK-8689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444218#comment-16444218 ] ASF GitHub Bot commented on FLINK-8689: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r182254735 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -327,19 +392,41 @@ class AggregationCodeGenerator( for (i <- aggs.indices) yield if (partialResults) { -j""" - |output.setField( - | ${aggMapping(i)}, - | (${accTypes(i)}) accs.getField($i));""".stripMargin +if (isDistinctAggs(i)) { + + j""" + |$distinctAccType distinctAcc$i = ($distinctAccType) accs.getField($i); + |output.setField( + | ${aggMapping(i)}, + | (${accTypes(i)}) distinctAcc$i.getRealAcc());""".stripMargin --- End diff -- We need to forward the distinct maps as well. `partialResults` is used when an operator needs to emit partial aggregation results such as a combine function in batch execution. So we don't need to distinguish the `isDistinctAggs(i)` case here. > Add runtime support of distinct filter using MapView > - > > Key: FLINK-8689 > URL: https://issues.apache.org/jira/browse/FLINK-8689 > Project: Flink > Issue Type: Sub-task >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > This ticket should cover distinct aggregate function support to codegen for > *AggregateCall*, where *isDistinct* fields is set to true. > This can be verified using the following SQL, which is not currently > producing correct results. > {code:java} > SELECT > a, > SUM(b) OVER (PARTITION BY a ORDER BY proctime ROWS BETWEEN 5 PRECEDING AND > CURRENT ROW) > FROM > MyTable{code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8689) Add runtime support of distinct filter using MapView
[ https://issues.apache.org/jira/browse/FLINK-8689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444222#comment-16444222 ] ASF GitHub Bot commented on FLINK-8689: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r182233134 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -1439,7 +1467,47 @@ object AggregateUtil { } } -(aggFieldIndexes, aggregates, accTypes, accSpecs) +// create distinct accumulator filter argument +val distinctAggs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size) + +aggregateCalls.zipWithIndex.foreach { + case (aggCall, index) => +if (aggCall.isDistinct) { + val argList: util.List[Integer] = aggCall.getArgList --- End diff -- add more comments for the `aggCall.isDistinct` branch > Add runtime support of distinct filter using MapView > - > > Key: FLINK-8689 > URL: https://issues.apache.org/jira/browse/FLINK-8689 > Project: Flink > Issue Type: Sub-task >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > This ticket should cover distinct aggregate function support to codegen for > *AggregateCall*, where *isDistinct* fields is set to true. > This can be verified using the following SQL, which is not currently > producing correct results. > {code:java} > SELECT > a, > SUM(b) OVER (PARTITION BY a ORDER BY proctime ROWS BETWEEN 5 PRECEDING AND > CURRENT ROW) > FROM > MyTable{code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8689) Add runtime support of distinct filter using MapView
[ https://issues.apache.org/jira/browse/FLINK-8689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444224#comment-16444224 ] ASF GitHub Bot commented on FLINK-8689: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r182753219 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -151,8 +157,15 @@ class AggregationCodeGenerator( } } -// initialize and create data views -addReusableDataViews() +// get distinct filter of acc fields for each aggregate functions +val distinctAccType = s"${classOf[DistinctAccumulator[_, _]].getName}" +val isDistinctAggs = distinctAggs.map(_.nonEmpty) --- End diff -- If we change the input parameter, we have the `Array[Boolean]` already > Add runtime support of distinct filter using MapView > - > > Key: FLINK-8689 > URL: https://issues.apache.org/jira/browse/FLINK-8689 > Project: Flink > Issue Type: Sub-task >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > This ticket should cover distinct aggregate function support to codegen for > *AggregateCall*, where *isDistinct* fields is set to true. > This can be verified using the following SQL, which is not currently > producing correct results. > {code:java} > SELECT > a, > SUM(b) OVER (PARTITION BY a ORDER BY proctime ROWS BETWEEN 5 PRECEDING AND > CURRENT ROW) > FROM > MyTable{code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8689) Add runtime support of distinct filter using MapView
[ https://issues.apache.org/jira/browse/FLINK-8689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444229#comment-16444229 ] ASF GitHub Bot commented on FLINK-8689: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r182768726 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -327,19 +392,41 @@ class AggregationCodeGenerator( for (i <- aggs.indices) yield if (partialResults) { -j""" - |output.setField( - | ${aggMapping(i)}, - | (${accTypes(i)}) accs.getField($i));""".stripMargin +if (isDistinctAggs(i)) { + + j""" + |$distinctAccType distinctAcc$i = ($distinctAccType) accs.getField($i); + |output.setField( + | ${aggMapping(i)}, + | (${accTypes(i)}) distinctAcc$i.getRealAcc());""".stripMargin +} else { + j""" + |output.setField( + | ${aggMapping(i)}, + | (${accTypes(i)}) accs.getField($i));""".stripMargin +} } else { -j""" - |org.apache.flink.table.functions.AggregateFunction baseClass$i = - | (org.apache.flink.table.functions.AggregateFunction) ${aggs(i)}; - |${accTypes(i)} acc$i = (${accTypes(i)}) accs.getField($i); - |${genDataViewFieldSetter(s"acc$i", i)} - |output.setField( - | ${aggMapping(i)}, - | baseClass$i.getValue(acc$i));""".stripMargin +if (isDistinctAggs(i)) { + j""" + |org.apache.flink.table.functions.AggregateFunction baseClass$i = + | (org.apache.flink.table.functions.AggregateFunction) ${aggs(i)}; + |$distinctAccType distinctAcc$i = ($distinctAccType) accs.getField($i); + |${genDistinctDataViewFieldSetter(s"distinctAcc$i", i)} + |${accTypes(i)} acc$i = (${accTypes(i)}) distinctAcc$i.getRealAcc(); + |${genAccDataViewFieldSetter(s"acc$i", i)} + |output.setField( + | ${aggMapping(i)}, + | baseClass$i.getValue(acc$i));""".stripMargin +} else { --- End diff -- both cases share a lot of code. We could only retrieve `acc$i` differently. Does that make sense or fragment the code too much? Same would apply for `accumulate()` and `retract()`. > Add runtime support of distinct filter using MapView > - > > Key: FLINK-8689 > URL: https://issues.apache.org/jira/browse/FLINK-8689 > Project: Flink > Issue Type: Sub-task >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > This ticket should cover distinct aggregate function support to codegen for > *AggregateCall*, where *isDistinct* fields is set to true. > This can be verified using the following SQL, which is not currently > producing correct results. > {code:java} > SELECT > a, > SUM(b) OVER (PARTITION BY a ORDER BY proctime ROWS BETWEEN 5 PRECEDING AND > CURRENT ROW) > FROM > MyTable{code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8689) Add runtime support of distinct filter using MapView
[ https://issues.apache.org/jira/browse/FLINK-8689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444225#comment-16444225 ] ASF GitHub Bot commented on FLINK-8689: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r182496465 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -1439,7 +1467,47 @@ object AggregateUtil { } } -(aggFieldIndexes, aggregates, accTypes, accSpecs) +// create distinct accumulator filter argument +val distinctAggs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size) + +aggregateCalls.zipWithIndex.foreach { + case (aggCall, index) => +if (aggCall.isDistinct) { + val argList: util.List[Integer] = aggCall.getArgList + // Only support single argument for distinct operation + if (argList.size() > 1) { +throw TableException( + "Cannot apply distinct filter on multiple input argument fields at this moment!") + } + val relDataType = aggregateInputType.getFieldList.get(argList.get(0)).getType + val fieldIndex = aggFieldIndexes(index)(0) + val mapViewTypeInfo = new MapViewTypeInfo( +FlinkTypeFactory.toTypeInfo(relDataType), BasicTypeInfo.INT_TYPE_INFO) --- End diff -- Does the approach also work for `null` values in both MapViews? If not, we can use a `Row(1)` that serializes a bitmask for `null` values. > Add runtime support of distinct filter using MapView > - > > Key: FLINK-8689 > URL: https://issues.apache.org/jira/browse/FLINK-8689 > Project: Flink > Issue Type: Sub-task >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > This ticket should cover distinct aggregate function support to codegen for > *AggregateCall*, where *isDistinct* fields is set to true. > This can be verified using the following SQL, which is not currently > producing correct results. > {code:java} > SELECT > a, > SUM(b) OVER (PARTITION BY a ORDER BY proctime ROWS BETWEEN 5 PRECEDING AND > CURRENT ROW) > FROM > MyTable{code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8689) Add runtime support of distinct filter using MapView
[ https://issues.apache.org/jira/browse/FLINK-8689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444226#comment-16444226 ] ASF GitHub Bot commented on FLINK-8689: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r182489141 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -1439,7 +1467,47 @@ object AggregateUtil { } } -(aggFieldIndexes, aggregates, accTypes, accSpecs) +// create distinct accumulator filter argument +val distinctAggs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size) + +aggregateCalls.zipWithIndex.foreach { + case (aggCall, index) => +if (aggCall.isDistinct) { + val argList: util.List[Integer] = aggCall.getArgList + // Only support single argument for distinct operation + if (argList.size() > 1) { +throw TableException( + "Cannot apply distinct filter on multiple input argument fields at this moment!") + } + val relDataType = aggregateInputType.getFieldList.get(argList.get(0)).getType + val fieldIndex = aggFieldIndexes(index)(0) + val mapViewTypeInfo = new MapViewTypeInfo( +FlinkTypeFactory.toTypeInfo(relDataType), BasicTypeInfo.INT_TYPE_INFO) --- End diff -- +1 > Add runtime support of distinct filter using MapView > - > > Key: FLINK-8689 > URL: https://issues.apache.org/jira/browse/FLINK-8689 > Project: Flink > Issue Type: Sub-task >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > This ticket should cover distinct aggregate function support to codegen for > *AggregateCall*, where *isDistinct* fields is set to true. > This can be verified using the following SQL, which is not currently > producing correct results. > {code:java} > SELECT > a, > SUM(b) OVER (PARTITION BY a ORDER BY proctime ROWS BETWEEN 5 PRECEDING AND > CURRENT ROW) > FROM > MyTable{code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8689) Add runtime support of distinct filter using MapView
[ https://issues.apache.org/jira/browse/FLINK-8689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444219#comment-16444219 ] ASF GitHub Bot commented on FLINK-8689: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r182469110 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/OverAggregate.scala --- @@ -70,7 +70,12 @@ trait OverAggregate { val aggStrings = namedAggregates.map(_.getKey).map( a => s"${a.getAggregation}(${ -if (a.getArgList.size() > 0) { +val prefix = if (a.isDistinct) { --- End diff -- In case we also want to support group-windowed DISTINCT aggregates, we would need to change `CommonAggregate.aggregationToString()` as well. > Add runtime support of distinct filter using MapView > - > > Key: FLINK-8689 > URL: https://issues.apache.org/jira/browse/FLINK-8689 > Project: Flink > Issue Type: Sub-task >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > This ticket should cover distinct aggregate function support to codegen for > *AggregateCall*, where *isDistinct* fields is set to true. > This can be verified using the following SQL, which is not currently > producing correct results. > {code:java} > SELECT > a, > SUM(b) OVER (PARTITION BY a ORDER BY proctime ROWS BETWEEN 5 PRECEDING AND > CURRENT ROW) > FROM > MyTable{code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8689) Add runtime support of distinct filter using MapView
[ https://issues.apache.org/jira/browse/FLINK-8689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444228#comment-16444228 ] ASF GitHub Bot commented on FLINK-8689: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r182768521 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -327,19 +392,41 @@ class AggregationCodeGenerator( for (i <- aggs.indices) yield if (partialResults) { -j""" - |output.setField( - | ${aggMapping(i)}, - | (${accTypes(i)}) accs.getField($i));""".stripMargin +if (isDistinctAggs(i)) { + + j""" + |$distinctAccType distinctAcc$i = ($distinctAccType) accs.getField($i); + |output.setField( + | ${aggMapping(i)}, + | (${accTypes(i)}) distinctAcc$i.getRealAcc());""".stripMargin +} else { + j""" + |output.setField( + | ${aggMapping(i)}, + | (${accTypes(i)}) accs.getField($i));""".stripMargin +} } else { -j""" - |org.apache.flink.table.functions.AggregateFunction baseClass$i = - | (org.apache.flink.table.functions.AggregateFunction) ${aggs(i)}; - |${accTypes(i)} acc$i = (${accTypes(i)}) accs.getField($i); - |${genDataViewFieldSetter(s"acc$i", i)} - |output.setField( - | ${aggMapping(i)}, - | baseClass$i.getValue(acc$i));""".stripMargin +if (isDistinctAggs(i)) { + j""" + |org.apache.flink.table.functions.AggregateFunction baseClass$i = + | (org.apache.flink.table.functions.AggregateFunction) ${aggs(i)}; + |$distinctAccType distinctAcc$i = ($distinctAccType) accs.getField($i); + |${genDistinctDataViewFieldSetter(s"distinctAcc$i", i)} --- End diff -- we don't need this statement > Add runtime support of distinct filter using MapView > - > > Key: FLINK-8689 > URL: https://issues.apache.org/jira/browse/FLINK-8689 > Project: Flink > Issue Type: Sub-task >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > This ticket should cover distinct aggregate function support to codegen for > *AggregateCall*, where *isDistinct* fields is set to true. > This can be verified using the following SQL, which is not currently > producing correct results. > {code:java} > SELECT > a, > SUM(b) OVER (PARTITION BY a ORDER BY proctime ROWS BETWEEN 5 PRECEDING AND > CURRENT ROW) > FROM > MyTable{code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8689) Add runtime support of distinct filter using MapView
[ https://issues.apache.org/jira/browse/FLINK-8689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444220#comment-16444220 ] ASF GitHub Bot commented on FLINK-8689: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r182481625 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/DistinctAccumulator.scala --- @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions.aggfunctions + +import java.lang.{Iterable => JIterable} +import java.util.{Map => JMap} +import org.apache.flink.table.api.dataview.MapView + +/** + * The base class for accumulator wrapper when applying distinct aggregation. + * @param realAcc the actual accumulator which gets invoke after distinct filter. + * @param mapView the [[MapView]] element used to store the distinct filter hash map. + * @tparam E the element type for the distinct filter hash map. + * @tparam ACC the accumulator type for the realAcc. + */ +class DistinctAccumulator[E, ACC](var realAcc: ACC, var mapView: MapView[E, Integer]) { + def this() { +this(null.asInstanceOf[ACC], new MapView[E, Integer]()) + } + + def this(realAcc: ACC) { +this(realAcc, new MapView[E, Integer]()) + } + + def getRealAcc: ACC = realAcc + + def canEqual(a: Any): Boolean = a.isInstanceOf[DistinctAccumulator[E, ACC]] + + override def equals(that: Any): Boolean = +that match { + case that: DistinctAccumulator[E, ACC] => that.canEqual(this) && +this.mapView == that.mapView + case _ => false +} + + def add(element: E): Boolean = { +if (element != null) { + if (mapView.contains(element)) { --- End diff -- +1 > Add runtime support of distinct filter using MapView > - > > Key: FLINK-8689 > URL: https://issues.apache.org/jira/browse/FLINK-8689 > Project: Flink > Issue Type: Sub-task >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > This ticket should cover distinct aggregate function support to codegen for > *AggregateCall*, where *isDistinct* fields is set to true. > This can be verified using the following SQL, which is not currently > producing correct results. > {code:java} > SELECT > a, > SUM(b) OVER (PARTITION BY a ORDER BY proctime ROWS BETWEEN 5 PRECEDING AND > CURRENT ROW) > FROM > MyTable{code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8689) Add runtime support of distinct filter using MapView
[ https://issues.apache.org/jira/browse/FLINK-8689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444217#comment-16444217 ] ASF GitHub Bot commented on FLINK-8689: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r182218521 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -1439,7 +1467,47 @@ object AggregateUtil { } } -(aggFieldIndexes, aggregates, accTypes, accSpecs) +// create distinct accumulator filter argument +val distinctAggs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size) --- End diff -- I would not create the `MapViewSpec`s here but do that in the code gen function. Here we should create an `Array[Boolean]` to flag distinct aggregations. > Add runtime support of distinct filter using MapView > - > > Key: FLINK-8689 > URL: https://issues.apache.org/jira/browse/FLINK-8689 > Project: Flink > Issue Type: Sub-task >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > This ticket should cover distinct aggregate function support to codegen for > *AggregateCall*, where *isDistinct* fields is set to true. > This can be verified using the following SQL, which is not currently > producing correct results. > {code:java} > SELECT > a, > SUM(b) OVER (PARTITION BY a ORDER BY proctime ROWS BETWEEN 5 PRECEDING AND > CURRENT ROW) > FROM > MyTable{code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8689) Add runtime support of distinct filter using MapView
[ https://issues.apache.org/jira/browse/FLINK-8689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444227#comment-16444227 ] ASF GitHub Bot commented on FLINK-8689: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r182753514 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -151,8 +157,15 @@ class AggregationCodeGenerator( } } -// initialize and create data views -addReusableDataViews() +// get distinct filter of acc fields for each aggregate functions +val distinctAccType = s"${classOf[DistinctAccumulator[_, _]].getName}" +val isDistinctAggs = distinctAggs.map(_.nonEmpty) --- End diff -- Instead we can create the `DataViewSpecs` here. > Add runtime support of distinct filter using MapView > - > > Key: FLINK-8689 > URL: https://issues.apache.org/jira/browse/FLINK-8689 > Project: Flink > Issue Type: Sub-task >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > This ticket should cover distinct aggregate function support to codegen for > *AggregateCall*, where *isDistinct* fields is set to true. > This can be verified using the following SQL, which is not currently > producing correct results. > {code:java} > SELECT > a, > SUM(b) OVER (PARTITION BY a ORDER BY proctime ROWS BETWEEN 5 PRECEDING AND > CURRENT ROW) > FROM > MyTable{code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r182496636 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -1439,7 +1467,47 @@ object AggregateUtil { } } -(aggFieldIndexes, aggregates, accTypes, accSpecs) +// create distinct accumulator filter argument +val distinctAggs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size) + +aggregateCalls.zipWithIndex.foreach { + case (aggCall, index) => +if (aggCall.isDistinct) { + val argList: util.List[Integer] = aggCall.getArgList + // Only support single argument for distinct operation + if (argList.size() > 1) { +throw TableException( + "Cannot apply distinct filter on multiple input argument fields at this moment!") + } + val relDataType = aggregateInputType.getFieldList.get(argList.get(0)).getType + val fieldIndex = aggFieldIndexes(index)(0) + val mapViewTypeInfo = new MapViewTypeInfo( +FlinkTypeFactory.toTypeInfo(relDataType), BasicTypeInfo.INT_TYPE_INFO) + + distinctAggs(index) = Seq( --- End diff -- I would generate the `MapViewSpec` in the aggregation code generator ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r182226785 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -1439,7 +1467,47 @@ object AggregateUtil { } } -(aggFieldIndexes, aggregates, accTypes, accSpecs) +// create distinct accumulator filter argument +val distinctAggs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size) + +aggregateCalls.zipWithIndex.foreach { + case (aggCall, index) => +if (aggCall.isDistinct) { + val argList: util.List[Integer] = aggCall.getArgList + // Only support single argument for distinct operation + if (argList.size() > 1) { +throw TableException( + "Cannot apply distinct filter on multiple input argument fields at this moment!") --- End diff -- Reword error message to `"DISTINCT aggregations with multiple parameters not fully supported yet."`. ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r182753514 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -151,8 +157,15 @@ class AggregationCodeGenerator( } } -// initialize and create data views -addReusableDataViews() +// get distinct filter of acc fields for each aggregate functions +val distinctAccType = s"${classOf[DistinctAccumulator[_, _]].getName}" +val isDistinctAggs = distinctAggs.map(_.nonEmpty) --- End diff -- Instead we can create the `DataViewSpecs` here. ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r182478446 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -93,6 +97,8 @@ class AggregationCodeGenerator( aggregates: Array[AggregateFunction[_ <: Any, _ <: Any]], aggFields: Array[Array[Int]], aggMapping: Array[Int], + distinctAggs: Array[Seq[DataViewSpec[_]]], --- End diff -- I would make this an `Array[Boolean]` and rename to `isDistinctAgg`. We can build the `MapViewSpec`s in the method. We have all the information for that in the other parameters. ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r182481625 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/DistinctAccumulator.scala --- @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions.aggfunctions + +import java.lang.{Iterable => JIterable} +import java.util.{Map => JMap} +import org.apache.flink.table.api.dataview.MapView + +/** + * The base class for accumulator wrapper when applying distinct aggregation. + * @param realAcc the actual accumulator which gets invoke after distinct filter. + * @param mapView the [[MapView]] element used to store the distinct filter hash map. + * @tparam E the element type for the distinct filter hash map. + * @tparam ACC the accumulator type for the realAcc. + */ +class DistinctAccumulator[E, ACC](var realAcc: ACC, var mapView: MapView[E, Integer]) { + def this() { +this(null.asInstanceOf[ACC], new MapView[E, Integer]()) + } + + def this(realAcc: ACC) { +this(realAcc, new MapView[E, Integer]()) + } + + def getRealAcc: ACC = realAcc + + def canEqual(a: Any): Boolean = a.isInstanceOf[DistinctAccumulator[E, ACC]] + + override def equals(that: Any): Boolean = +that match { + case that: DistinctAccumulator[E, ACC] => that.canEqual(this) && +this.mapView == that.mapView + case _ => false +} + + def add(element: E): Boolean = { +if (element != null) { + if (mapView.contains(element)) { --- End diff -- +1 ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r182479163 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -93,6 +97,8 @@ class AggregationCodeGenerator( aggregates: Array[AggregateFunction[_ <: Any, _ <: Any]], aggFields: Array[Array[Int]], aggMapping: Array[Int], + distinctAggs: Array[Seq[DataViewSpec[_]]], + isStateBackedDataViews: Boolean, --- End diff -- We should add a constructor check for `if (partialResults && isStateBackedDataViews)` and throw an exception if `true`. `partialResults` means that the `Row` with the accumulators has to be emitted which won't work well for state-backed distinct maps that are probably too big. ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r182768521 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -327,19 +392,41 @@ class AggregationCodeGenerator( for (i <- aggs.indices) yield if (partialResults) { -j""" - |output.setField( - | ${aggMapping(i)}, - | (${accTypes(i)}) accs.getField($i));""".stripMargin +if (isDistinctAggs(i)) { + + j""" + |$distinctAccType distinctAcc$i = ($distinctAccType) accs.getField($i); + |output.setField( + | ${aggMapping(i)}, + | (${accTypes(i)}) distinctAcc$i.getRealAcc());""".stripMargin +} else { + j""" + |output.setField( + | ${aggMapping(i)}, + | (${accTypes(i)}) accs.getField($i));""".stripMargin +} } else { -j""" - |org.apache.flink.table.functions.AggregateFunction baseClass$i = - | (org.apache.flink.table.functions.AggregateFunction) ${aggs(i)}; - |${accTypes(i)} acc$i = (${accTypes(i)}) accs.getField($i); - |${genDataViewFieldSetter(s"acc$i", i)} - |output.setField( - | ${aggMapping(i)}, - | baseClass$i.getValue(acc$i));""".stripMargin +if (isDistinctAggs(i)) { + j""" + |org.apache.flink.table.functions.AggregateFunction baseClass$i = + | (org.apache.flink.table.functions.AggregateFunction) ${aggs(i)}; + |$distinctAccType distinctAcc$i = ($distinctAccType) accs.getField($i); + |${genDistinctDataViewFieldSetter(s"distinctAcc$i", i)} --- End diff -- we don't need this statement ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r182218521 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -1439,7 +1467,47 @@ object AggregateUtil { } } -(aggFieldIndexes, aggregates, accTypes, accSpecs) +// create distinct accumulator filter argument +val distinctAggs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size) --- End diff -- I would not create the `MapViewSpec`s here but do that in the code gen function. Here we should create an `Array[Boolean]` to flag distinct aggregations. ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r182496465 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -1439,7 +1467,47 @@ object AggregateUtil { } } -(aggFieldIndexes, aggregates, accTypes, accSpecs) +// create distinct accumulator filter argument +val distinctAggs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size) + +aggregateCalls.zipWithIndex.foreach { + case (aggCall, index) => +if (aggCall.isDistinct) { + val argList: util.List[Integer] = aggCall.getArgList + // Only support single argument for distinct operation + if (argList.size() > 1) { +throw TableException( + "Cannot apply distinct filter on multiple input argument fields at this moment!") + } + val relDataType = aggregateInputType.getFieldList.get(argList.get(0)).getType + val fieldIndex = aggFieldIndexes(index)(0) + val mapViewTypeInfo = new MapViewTypeInfo( +FlinkTypeFactory.toTypeInfo(relDataType), BasicTypeInfo.INT_TYPE_INFO) --- End diff -- Does the approach also work for `null` values in both MapViews? If not, we can use a `Row(1)` that serializes a bitmask for `null` values. ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r182773359 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -417,13 +530,26 @@ class AggregationCodeGenerator( .stripMargin val create: String = { for (i <- aggs.indices) yield { - j""" - |${accTypes(i)} acc$i = (${accTypes(i)}) ${aggs(i)}.createAccumulator(); - |${genDataViewFieldSetter(s"acc$i", i)} - |accs.setField( - | $i, - | acc$i);""" -.stripMargin + if (isDistinctAggs(i)) { +j""" + |${accTypes(i)} acc$i = (${accTypes(i)}) ${aggs(i)}.createAccumulator(); + |$distinctAccType distinctAcc$i = ($distinctAccType) new org.apache.flink.table. + |functions.aggfunctions.DistinctAccumulator(acc$i); + |${genDistinctDataViewFieldSetter(s"distinctAcc$i", i)} --- End diff -- I think this and the `genAccDataViewFieldSetter` call (both from here and the non-distinct case) can be removed. ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r182219690 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala --- @@ -50,6 +51,96 @@ class OverWindowITCase extends StreamingWithStateTestBase { (8L, 8, "Hello World"), (20L, 20, "Hello World")) + @Test + def testProcTimeDistinctBoundedPartitionedRowsOver(): Unit = { --- End diff -- I tested a few other queries with distinct aggregates. * Queries with non-windowed `DISTINCT` aggregations work, but they are they are translated without distinct aggregations. So they changes in this PR are not used. * Queries with `DISTINCT` aggregates and `TUMBLE` or `HOP` windows fail with strange exceptions. Did not look related to these changes, but would be good to check. We don't have to fix these things in this PR (unless it is broken by these changes). In general, I think it would be good to add unit tests for the `AggregationCodeGenerator`. We could generate `GeneratedAggregations` for different configurations, compile them, and check if the result is expected. Actually, we should have done that already. This should also work for state-backend backed views if we embed the test in a HarnessTest. ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r182489141 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -1439,7 +1467,47 @@ object AggregateUtil { } } -(aggFieldIndexes, aggregates, accTypes, accSpecs) +// create distinct accumulator filter argument +val distinctAggs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size) + +aggregateCalls.zipWithIndex.foreach { + case (aggCall, index) => +if (aggCall.isDistinct) { + val argList: util.List[Integer] = aggCall.getArgList + // Only support single argument for distinct operation + if (argList.size() > 1) { +throw TableException( + "Cannot apply distinct filter on multiple input argument fields at this moment!") + } + val relDataType = aggregateInputType.getFieldList.get(argList.get(0)).getType + val fieldIndex = aggFieldIndexes(index)(0) + val mapViewTypeInfo = new MapViewTypeInfo( +FlinkTypeFactory.toTypeInfo(relDataType), BasicTypeInfo.INT_TYPE_INFO) --- End diff -- +1 ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r182233134 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -1439,7 +1467,47 @@ object AggregateUtil { } } -(aggFieldIndexes, aggregates, accTypes, accSpecs) +// create distinct accumulator filter argument +val distinctAggs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size) + +aggregateCalls.zipWithIndex.foreach { + case (aggCall, index) => +if (aggCall.isDistinct) { + val argList: util.List[Integer] = aggCall.getArgList --- End diff -- add more comments for the `aggCall.isDistinct` branch ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r182753219 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -151,8 +157,15 @@ class AggregationCodeGenerator( } } -// initialize and create data views -addReusableDataViews() +// get distinct filter of acc fields for each aggregate functions +val distinctAccType = s"${classOf[DistinctAccumulator[_, _]].getName}" +val isDistinctAggs = distinctAggs.map(_.nonEmpty) --- End diff -- If we change the input parameter, we have the `Array[Boolean]` already ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r182254735 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -327,19 +392,41 @@ class AggregationCodeGenerator( for (i <- aggs.indices) yield if (partialResults) { -j""" - |output.setField( - | ${aggMapping(i)}, - | (${accTypes(i)}) accs.getField($i));""".stripMargin +if (isDistinctAggs(i)) { + + j""" + |$distinctAccType distinctAcc$i = ($distinctAccType) accs.getField($i); + |output.setField( + | ${aggMapping(i)}, + | (${accTypes(i)}) distinctAcc$i.getRealAcc());""".stripMargin --- End diff -- We need to forward the distinct maps as well. `partialResults` is used when an operator needs to emit partial aggregation results such as a combine function in batch execution. So we don't need to distinguish the `isDistinctAggs(i)` case here. ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r182768726 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -327,19 +392,41 @@ class AggregationCodeGenerator( for (i <- aggs.indices) yield if (partialResults) { -j""" - |output.setField( - | ${aggMapping(i)}, - | (${accTypes(i)}) accs.getField($i));""".stripMargin +if (isDistinctAggs(i)) { + + j""" + |$distinctAccType distinctAcc$i = ($distinctAccType) accs.getField($i); + |output.setField( + | ${aggMapping(i)}, + | (${accTypes(i)}) distinctAcc$i.getRealAcc());""".stripMargin +} else { + j""" + |output.setField( + | ${aggMapping(i)}, + | (${accTypes(i)}) accs.getField($i));""".stripMargin +} } else { -j""" - |org.apache.flink.table.functions.AggregateFunction baseClass$i = - | (org.apache.flink.table.functions.AggregateFunction) ${aggs(i)}; - |${accTypes(i)} acc$i = (${accTypes(i)}) accs.getField($i); - |${genDataViewFieldSetter(s"acc$i", i)} - |output.setField( - | ${aggMapping(i)}, - | baseClass$i.getValue(acc$i));""".stripMargin +if (isDistinctAggs(i)) { + j""" + |org.apache.flink.table.functions.AggregateFunction baseClass$i = + | (org.apache.flink.table.functions.AggregateFunction) ${aggs(i)}; + |$distinctAccType distinctAcc$i = ($distinctAccType) accs.getField($i); + |${genDistinctDataViewFieldSetter(s"distinctAcc$i", i)} + |${accTypes(i)} acc$i = (${accTypes(i)}) distinctAcc$i.getRealAcc(); + |${genAccDataViewFieldSetter(s"acc$i", i)} + |output.setField( + | ${aggMapping(i)}, + | baseClass$i.getValue(acc$i));""".stripMargin +} else { --- End diff -- both cases share a lot of code. We could only retrieve `acc$i` differently. Does that make sense or fragment the code too much? Same would apply for `accumulate()` and `retract()`. ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r182469110 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/OverAggregate.scala --- @@ -70,7 +70,12 @@ trait OverAggregate { val aggStrings = namedAggregates.map(_.getKey).map( a => s"${a.getAggregation}(${ -if (a.getArgList.size() > 0) { +val prefix = if (a.isDistinct) { --- End diff -- In case we also want to support group-windowed DISTINCT aggregates, we would need to change `CommonAggregate.aggregationToString()` as well. ---
[jira] [Commented] (FLINK-8836) Duplicating a KryoSerializer does not duplicate registered default serializers
[ https://issues.apache.org/jira/browse/FLINK-8836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444174#comment-16444174 ] ASF GitHub Bot commented on FLINK-8836: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5880#discussion_r182777472 --- Diff: flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerConcurrencyTest.java --- @@ -24,21 +24,71 @@ import org.apache.flink.core.testutils.BlockerSync; import org.apache.flink.core.testutils.CheckedThread; +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import org.junit.Assert; import org.junit.Test; import java.io.IOException; +import java.io.Serializable; import static org.junit.Assert.fail; /** * This tests that the {@link KryoSerializer} properly fails when accessed by two threads - * concurrently. + * concurrently and that Kryo serializers are properly duplicated to use them in different threads. * * Important: This test only works if assertions are activated (-ea) on the JVM * when running tests. */ public class KryoSerializerConcurrencyTest { + @Test + public void testDuplicateSerializerWithDefaultSerializerClass() { --- End diff -- test names are mixed up, this and the next one should be switched > Duplicating a KryoSerializer does not duplicate registered default serializers > -- > > Key: FLINK-8836 > URL: https://issues.apache.org/jira/browse/FLINK-8836 > Project: Flink > Issue Type: Bug > Components: Type Serialization System >Reporter: Tzu-Li (Gordon) Tai >Assignee: Stefan Richter >Priority: Blocker > Fix For: 1.5.0 > > > The {{duplicate()}} method of the {{KryoSerializer}} is as following: > {code:java} > public KryoSerializer duplicate() { > return new KryoSerializer<>(this); > } > protected KryoSerializer(KryoSerializer toCopy) { > defaultSerializers = toCopy.defaultSerializers; > defaultSerializerClasses = toCopy.defaultSerializerClasses; > kryoRegistrations = toCopy.kryoRegistrations; > ... > } > {code} > Shortly put, when duplicating a {{KryoSerializer}}, the > {{defaultSerializers}} serializer instances are directly provided to the new > {{KryoSerializer}} instance. > This causes the fact that those default serializers are shared across two > different {{KryoSerializer}} instances, and therefore not a correct duplicate. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5880: [FLINK-8836] Fix duplicate method in KryoSerialize...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5880#discussion_r182777472 --- Diff: flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerConcurrencyTest.java --- @@ -24,21 +24,71 @@ import org.apache.flink.core.testutils.BlockerSync; import org.apache.flink.core.testutils.CheckedThread; +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import org.junit.Assert; import org.junit.Test; import java.io.IOException; +import java.io.Serializable; import static org.junit.Assert.fail; /** * This tests that the {@link KryoSerializer} properly fails when accessed by two threads - * concurrently. + * concurrently and that Kryo serializers are properly duplicated to use them in different threads. * * Important: This test only works if assertions are activated (-ea) on the JVM * when running tests. */ public class KryoSerializerConcurrencyTest { + @Test + public void testDuplicateSerializerWithDefaultSerializerClass() { --- End diff -- test names are mixed up, this and the next one should be switched ---
[jira] [Commented] (FLINK-8836) Duplicating a KryoSerializer does not duplicate registered default serializers
[ https://issues.apache.org/jira/browse/FLINK-8836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444169#comment-16444169 ] ASF GitHub Bot commented on FLINK-8836: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5880#discussion_r182776735 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java --- @@ -140,14 +140,37 @@ public KryoSerializer(Class type, ExecutionConfig executionConfig){ * Copy-constructor that does not copy transient fields. They will be initialized once required. */ protected KryoSerializer(KryoSerializer toCopy) { - defaultSerializers = toCopy.defaultSerializers; - defaultSerializerClasses = toCopy.defaultSerializerClasses; - kryoRegistrations = toCopy.kryoRegistrations; + this.type = checkNotNull(toCopy.type, "Type class cannot be null."); + this.defaultSerializerClasses = toCopy.defaultSerializerClasses; + this.defaultSerializers = new LinkedHashMap<>(toCopy.defaultSerializers.size()); + this.kryoRegistrations = new LinkedHashMap<>(toCopy.kryoRegistrations.size()); + + // deep copy the serializer instances in defaultSerializers + for (Map.Entryentry : + toCopy.defaultSerializers.entrySet()) { - type = toCopy.type; - if(type == null){ - throw new NullPointerException("Type class cannot be null."); + this.defaultSerializers.put(entry.getKey(), deepCopySerializer(entry.getValue())); + } + + // deep copy the serializer instances in kryoRegistrations + for (Map.Entry entry : toCopy.kryoRegistrations.entrySet()) { --- End diff -- The problem is that we don't have the `ExecutionConfig` in the copy constructor. > Duplicating a KryoSerializer does not duplicate registered default serializers > -- > > Key: FLINK-8836 > URL: https://issues.apache.org/jira/browse/FLINK-8836 > Project: Flink > Issue Type: Bug > Components: Type Serialization System >Reporter: Tzu-Li (Gordon) Tai >Assignee: Stefan Richter >Priority: Blocker > Fix For: 1.5.0 > > > The {{duplicate()}} method of the {{KryoSerializer}} is as following: > {code:java} > public KryoSerializer duplicate() { > return new KryoSerializer<>(this); > } > protected KryoSerializer(KryoSerializer toCopy) { > defaultSerializers = toCopy.defaultSerializers; > defaultSerializerClasses = toCopy.defaultSerializerClasses; > kryoRegistrations = toCopy.kryoRegistrations; > ... > } > {code} > Shortly put, when duplicating a {{KryoSerializer}}, the > {{defaultSerializers}} serializer instances are directly provided to the new > {{KryoSerializer}} instance. > This causes the fact that those default serializers are shared across two > different {{KryoSerializer}} instances, and therefore not a correct duplicate. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5880: [FLINK-8836] Fix duplicate method in KryoSerialize...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5880#discussion_r182776735 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java --- @@ -140,14 +140,37 @@ public KryoSerializer(Class type, ExecutionConfig executionConfig){ * Copy-constructor that does not copy transient fields. They will be initialized once required. */ protected KryoSerializer(KryoSerializer toCopy) { - defaultSerializers = toCopy.defaultSerializers; - defaultSerializerClasses = toCopy.defaultSerializerClasses; - kryoRegistrations = toCopy.kryoRegistrations; + this.type = checkNotNull(toCopy.type, "Type class cannot be null."); + this.defaultSerializerClasses = toCopy.defaultSerializerClasses; + this.defaultSerializers = new LinkedHashMap<>(toCopy.defaultSerializers.size()); + this.kryoRegistrations = new LinkedHashMap<>(toCopy.kryoRegistrations.size()); + + // deep copy the serializer instances in defaultSerializers + for (Map.Entryentry : + toCopy.defaultSerializers.entrySet()) { - type = toCopy.type; - if(type == null){ - throw new NullPointerException("Type class cannot be null."); + this.defaultSerializers.put(entry.getKey(), deepCopySerializer(entry.getValue())); + } + + // deep copy the serializer instances in kryoRegistrations + for (Map.Entry entry : toCopy.kryoRegistrations.entrySet()) { --- End diff -- The problem is that we don't have the `ExecutionConfig` in the copy constructor. ---
[jira] [Updated] (FLINK-8979) Extend Kafka end-to-end tests to run with different versions
[ https://issues.apache.org/jira/browse/FLINK-8979?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-8979: Fix Version/s: (was: 1.5.0) 1.5.1 1.6.0 > Extend Kafka end-to-end tests to run with different versions > > > Key: FLINK-8979 > URL: https://issues.apache.org/jira/browse/FLINK-8979 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.6.0, 1.5.1 > > > The current {{Kafka}} end-to-end test only runs with Kafka 0.10. We should > extend the test to also run with > * Kafka 0.8 > * Kafka 0.9 > * Kafka 0.11 > Additionally we should change the test job to not be embarrassingly parallel > by introducing a shuffle. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8982) End-to-end test: Queryable state
[ https://issues.apache.org/jira/browse/FLINK-8982?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-8982: Fix Version/s: (was: 1.5.0) 1.5.1 1.6.0 > End-to-end test: Queryable state > > > Key: FLINK-8982 > URL: https://issues.apache.org/jira/browse/FLINK-8982 > Project: Flink > Issue Type: Sub-task > Components: Queryable State, Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Florian Schmidt >Priority: Blocker > Fix For: 1.6.0, 1.5.1 > > > We should add an end-to-end test which verifies that {{Queryable State}} is > working. > [~florianschmidt] and [~kkl0u] could you please provide more details for the > description. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8977) End-to-end test: Manually resume job after terminal failure
[ https://issues.apache.org/jira/browse/FLINK-8977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-8977: Fix Version/s: (was: 1.5.0) 1.5.1 1.6.0 > End-to-end test: Manually resume job after terminal failure > --- > > Key: FLINK-8977 > URL: https://issues.apache.org/jira/browse/FLINK-8977 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.6.0, 1.5.1 > > > We should add an end-to-end test which verifies that a job can be resumed > manually after a terminal job failure if there is a checkpoint. In order to > do that we should > # run the general purpose testing job FLINK-8971 > # wait for the completion of a checkpoint > # Trigger a failure which leads to a terminal failure > # Resume the job from the retained checkpoint > This end-to-end test should run with all state backend combinations: RocksDB > (incremental/full, async/sync), FsStateBackend (async/sync). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8286) Investigate Flink-Yarn-Kerberos integration for flip-6
[ https://issues.apache.org/jira/browse/FLINK-8286?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444149#comment-16444149 ] Aljoscha Krettek commented on FLINK-8286: - [~suez1224] Is there any update on this? > Investigate Flink-Yarn-Kerberos integration for flip-6 > -- > > Key: FLINK-8286 > URL: https://issues.apache.org/jira/browse/FLINK-8286 > Project: Flink > Issue Type: Task > Components: Security >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > We've found some issues with the Flink-Yarn-Kerberos integration in the > current deployment model, we will need to investigate and test it for flip-6 > when it's ready. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-7431) test instability in JobManagerFailsITCase
[ https://issues.apache.org/jira/browse/FLINK-7431?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek closed FLINK-7431. --- Resolution: Cannot Reproduce Closing because we haven't seen this in a while and also because it's tests for the legacy JobManager/TaskManager. > test instability in JobManagerFailsITCase > - > > Key: FLINK-7431 > URL: https://issues.apache.org/jira/browse/FLINK-7431 > Project: Flink > Issue Type: Improvement > Components: JobManager, Tests >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Priority: Blocker > Labels: test-stability > Fix For: 1.5.0 > > > In a branch with unrelated changes: > {code} > A TaskManager should go into a clean state in case of a JobManager > failure(org.apache.flink.api.scala.runtime.jobmanager.JobManagerFailsITCase) > Time elapsed: 121.105 sec <<< FAILURE! > java.lang.AssertionError: assertion failed: timeout (119126026463 > nanoseconds) during expectMsg while waiting for Acknowledge > at scala.Predef$.assert(Predef.scala:170) > at akka.testkit.TestKitBase$class.expectMsg_internal(TestKit.scala:338) > at akka.testkit.TestKitBase$class.expectMsg(TestKit.scala:315) > at akka.testkit.TestKit.expectMsg(TestKit.scala:718) > at > org.apache.flink.api.scala.runtime.jobmanager.JobManagerFailsITCase$$anonfun$1$$anonfun$apply$mcV$sp$3$$anonfun$apply$mcV$sp$4.apply$mcV$sp(JobManagerFailsITCase.scala:118) > at > org.apache.flink.api.scala.runtime.jobmanager.JobManagerFailsITCase$$anonfun$1$$anonfun$apply$mcV$sp$3$$anonfun$apply$mcV$sp$4.apply(JobManagerFailsITCase.scala:104) > at > org.apache.flink.api.scala.runtime.jobmanager.JobManagerFailsITCase$$anonfun$1$$anonfun$apply$mcV$sp$3$$anonfun$apply$mcV$sp$4.apply(JobManagerFailsITCase.scala:104) > at akka.testkit.TestKitBase$class.within(TestKit.scala:296) > at akka.testkit.TestKit.within(TestKit.scala:718) > at akka.testkit.TestKitBase$class.within(TestKit.scala:310) > at akka.testkit.TestKit.within(TestKit.scala:718) > at > org.apache.flink.api.scala.runtime.jobmanager.JobManagerFailsITCase$$anonfun$1$$anonfun$apply$mcV$sp$3.apply$mcV$sp(JobManagerFailsITCase.scala:104) > at > org.apache.flink.api.scala.runtime.jobmanager.JobManagerFailsITCase$$anonfun$1$$anonfun$apply$mcV$sp$3.apply(JobManagerFailsITCase.scala:85) > at > org.apache.flink.api.scala.runtime.jobmanager.JobManagerFailsITCase$$anonfun$1$$anonfun$apply$mcV$sp$3.apply(JobManagerFailsITCase.scala:85) > at > org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22) > at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) > at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) > at org.scalatest.Transformer.apply(Transformer.scala:22) > at org.scalatest.Transformer.apply(Transformer.scala:20) > at org.scalatest.WordSpecLike$$anon$1.apply(WordSpecLike.scala:953) > at org.scalatest.Suite$class.withFixture(Suite.scala:1122) > at > org.apache.flink.api.scala.runtime.jobmanager.JobManagerFailsITCase.withFixture(JobManagerFailsITCase.scala:37) > {code} > https://travis-ci.org/NicoK/flink/jobs/263422429 > full logs: https://transfer.sh/bRKz7/399.4.tar.gz (unfortunately not much > more info there, as far as I can see) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9220) Table program cannot be compiled
[ https://issues.apache.org/jira/browse/FLINK-9220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444092#comment-16444092 ] Timo Walther commented on FLINK-9220: - Which package has your function? According to the exception {{Cannot determine simple type name "com"}} it seems that it can't find your package. Are you executing this program in your IDE or on the (local) Flink cluster? If cluster, are you sure that your function's class is in the jar that you submit to the cluster. > Table program cannot be compiled > > > Key: FLINK-9220 > URL: https://issues.apache.org/jira/browse/FLINK-9220 > Project: Flink > Issue Type: Bug >Reporter: Saurabh Garg >Priority: Major > > Flink job gets failed with scalar UDF. I am using Flink 1.4. Issue came with > Scalar UDF > Below is the error logs: > > org.apache.flink.api.common.InvalidProgramException: Table program cannot be > compiled. This is a bug. Please file an issue. at > org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36) at > org.apache.flink.table.runtime.CRowProcessRunner.compile(CRowProcessRunner.scala:35) > at > org.apache.flink.table.runtime.CRowProcessRunner.open(CRowProcessRunner.scala:49) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > at > org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at > java.lang.Thread.run(Thread.java:748) Caused by: > org.codehaus.commons.compiler.CompileException: Line 6, Column 18: Cannot > determine simple type name "com" at > org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11672) at > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6416) at > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6177) at > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6190) at > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6190) at > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6190) at > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6190) at > org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6156) at > org.codehaus.janino.UnitCompiler.access$13300(UnitCompiler.java:212) at > org.codehaus.janino.UnitCompiler$18$1.visitReferenceType(UnitCompiler.java:6064) > at > org.codehaus.janino.UnitCompiler$18$1.visitReferenceType(UnitCompiler.java:6059) > at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3754) at > org.codehaus.janino.UnitCompiler$18.visitType(UnitCompiler.java:6059) at > org.codehaus.janino.UnitCompiler$18.visitType(UnitCompiler.java:6052) at > org.codehaus.janino.Java$ReferenceType.accept(Java.java:3753) at > org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6052) at > org.codehaus.janino.UnitCompiler.access$1200(UnitCompiler.java:212) at > org.codehaus.janino.UnitCompiler$21.getType(UnitCompiler.java:7844) at > org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6456) at > org.codehaus.janino.UnitCompiler.access$13800(UnitCompiler.java:212) at > org.codehaus.janino.UnitCompiler$18$2$1.visitFieldAccess(UnitCompiler.java:6082) > at > org.codehaus.janino.UnitCompiler$18$2$1.visitFieldAccess(UnitCompiler.java:6077) > at org.codehaus.janino.Java$FieldAccess.accept(Java.java:4136) at > org.codehaus.janino.UnitCompiler$18$2.visitLvalue(UnitCompiler.java:6077) at > org.codehaus.janino.UnitCompiler$18$2.visitLvalue(UnitCompiler.java:6073) at > org.codehaus.janino.Java$Lvalue.accept(Java.java:3974) at > org.codehaus.janino.UnitCompiler$18.visitRvalue(UnitCompiler.java:6073) at > org.codehaus.janino.UnitCompiler$18.visitRvalue(UnitCompiler.java:6052) at > org.codehaus.janino.Java$Rvalue.accept(Java.java:3942) at > org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6052) at > org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6438) at > org.codehaus.janino.UnitCompiler.access$13600(UnitCompiler.java:212) at > org.codehaus.janino.UnitCompiler$18$2$1.visitAmbiguousName(UnitCompiler.java:6080) > at > org.codehaus.janino.UnitCompiler$18$2$1.visitAmbiguousName(UnitCompiler.java:6077) > at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:4050) at > org.codehaus.janino.UnitCompiler$18$2.visitLvalue(UnitCompiler.java:6077) at > org.codehaus.janino.UnitCompiler$18$2.visitLvalue(UnitCompiler.java:6073) at >
[jira] [Commented] (FLINK-9155) Provide message context information in DeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9155?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444080#comment-16444080 ] Tzu-Li (Gordon) Tai commented on FLINK-9155: I think we can implement both logging and metrics to track this. For metrics, we should be able to use the user variable functionality to have skip counters that can be grouped by topic / partition. This should allow more prompt alerts of skipped messages, and users should direct to logs for more details on the erroring record. For logging, the information should contain topic, partition, and offset for debugging. > Provide message context information in DeserializationSchema > > > Key: FLINK-9155 > URL: https://issues.apache.org/jira/browse/FLINK-9155 > Project: Flink > Issue Type: Improvement >Reporter: Alex Smirnov >Priority: Minor > > There's no way to retrieve more information about corrupted message in the > DeserializationSchema class. It is only possible to return null, which is a > signal to skip the message, and to throw an exception, which will cause job > failure. > For investigation purposes it would be good to have more information, like: > * kafka topic from which the message came from > ** in Flink 1.4, it is possible to subscribe using Pattern, so topic name is > not always evident > * kafka topic offset > The idea is to write this information into the log file for further analysis. > Having topic name and offset allows to retrieve the message and investigate > it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-9204) Improve visibility of records being skipped by the `DeserializationSchema` in the Kafka / Kinesis connectors
[ https://issues.apache.org/jira/browse/FLINK-9204?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai resolved FLINK-9204. Resolution: Duplicate > Improve visibility of records being skipped by the `DeserializationSchema` in > the Kafka / Kinesis connectors > > > Key: FLINK-9204 > URL: https://issues.apache.org/jira/browse/FLINK-9204 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector, Kinesis Connector, Metrics >Reporter: Tzu-Li (Gordon) Tai >Priority: Minor > Fix For: 1.6.0 > > > Currently, users have to write some log for visibility if they are skipping > some record (by returning {{null}} from the deserialization schema). > We should have better support for this natively in the connectors, either by > adding logs if {{null}} is returned by the user deserialization schema, or > adding counter metrics to track how many records were skipped so far. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9220) Table program cannot be compiled
[ https://issues.apache.org/jira/browse/FLINK-9220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444072#comment-16444072 ] Saurabh Garg commented on FLINK-9220: - Registered this ScalarFunction on StreamTableEnvironment. Flink job gets started, goes to running state and then fails. > Table program cannot be compiled > > > Key: FLINK-9220 > URL: https://issues.apache.org/jira/browse/FLINK-9220 > Project: Flink > Issue Type: Bug >Reporter: Saurabh Garg >Priority: Major > > Flink job gets failed with scalar UDF. I am using Flink 1.4. Issue came with > Scalar UDF > Below is the error logs: > > org.apache.flink.api.common.InvalidProgramException: Table program cannot be > compiled. This is a bug. Please file an issue. at > org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36) at > org.apache.flink.table.runtime.CRowProcessRunner.compile(CRowProcessRunner.scala:35) > at > org.apache.flink.table.runtime.CRowProcessRunner.open(CRowProcessRunner.scala:49) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > at > org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at > java.lang.Thread.run(Thread.java:748) Caused by: > org.codehaus.commons.compiler.CompileException: Line 6, Column 18: Cannot > determine simple type name "com" at > org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11672) at > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6416) at > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6177) at > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6190) at > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6190) at > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6190) at > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6190) at > org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6156) at > org.codehaus.janino.UnitCompiler.access$13300(UnitCompiler.java:212) at > org.codehaus.janino.UnitCompiler$18$1.visitReferenceType(UnitCompiler.java:6064) > at > org.codehaus.janino.UnitCompiler$18$1.visitReferenceType(UnitCompiler.java:6059) > at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3754) at > org.codehaus.janino.UnitCompiler$18.visitType(UnitCompiler.java:6059) at > org.codehaus.janino.UnitCompiler$18.visitType(UnitCompiler.java:6052) at > org.codehaus.janino.Java$ReferenceType.accept(Java.java:3753) at > org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6052) at > org.codehaus.janino.UnitCompiler.access$1200(UnitCompiler.java:212) at > org.codehaus.janino.UnitCompiler$21.getType(UnitCompiler.java:7844) at > org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6456) at > org.codehaus.janino.UnitCompiler.access$13800(UnitCompiler.java:212) at > org.codehaus.janino.UnitCompiler$18$2$1.visitFieldAccess(UnitCompiler.java:6082) > at > org.codehaus.janino.UnitCompiler$18$2$1.visitFieldAccess(UnitCompiler.java:6077) > at org.codehaus.janino.Java$FieldAccess.accept(Java.java:4136) at > org.codehaus.janino.UnitCompiler$18$2.visitLvalue(UnitCompiler.java:6077) at > org.codehaus.janino.UnitCompiler$18$2.visitLvalue(UnitCompiler.java:6073) at > org.codehaus.janino.Java$Lvalue.accept(Java.java:3974) at > org.codehaus.janino.UnitCompiler$18.visitRvalue(UnitCompiler.java:6073) at > org.codehaus.janino.UnitCompiler$18.visitRvalue(UnitCompiler.java:6052) at > org.codehaus.janino.Java$Rvalue.accept(Java.java:3942) at > org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6052) at > org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6438) at > org.codehaus.janino.UnitCompiler.access$13600(UnitCompiler.java:212) at > org.codehaus.janino.UnitCompiler$18$2$1.visitAmbiguousName(UnitCompiler.java:6080) > at > org.codehaus.janino.UnitCompiler$18$2$1.visitAmbiguousName(UnitCompiler.java:6077) > at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:4050) at > org.codehaus.janino.UnitCompiler$18$2.visitLvalue(UnitCompiler.java:6077) at > org.codehaus.janino.UnitCompiler$18$2.visitLvalue(UnitCompiler.java:6073) at > org.codehaus.janino.Java$Lvalue.accept(Java.java:3974) at > org.codehaus.janino.UnitCompiler$18.visitRvalue(UnitCompiler.java:6073) at > org.codehaus.janino.UnitCompiler$18.visitRvalue(UnitCompiler.java:6052) at >
[jira] [Commented] (FLINK-9220) Table program cannot be compiled
[ https://issues.apache.org/jira/browse/FLINK-9220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444070#comment-16444070 ] Saurabh Garg commented on FLINK-9220: - Query: select id, HashCode(id) as hashcode from input1 Function is similar to provided in documentation. public class HashCode extends ScalarFunction { private int factor = 12; public HashCode(int factor) { this.factor = factor; } public HashCode(){} public int eval(String s) { return s.hashCode() * factor; } } > Table program cannot be compiled > > > Key: FLINK-9220 > URL: https://issues.apache.org/jira/browse/FLINK-9220 > Project: Flink > Issue Type: Bug >Reporter: Saurabh Garg >Priority: Major > > Flink job gets failed with scalar UDF. I am using Flink 1.4. Issue came with > Scalar UDF > Below is the error logs: > > org.apache.flink.api.common.InvalidProgramException: Table program cannot be > compiled. This is a bug. Please file an issue. at > org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36) at > org.apache.flink.table.runtime.CRowProcessRunner.compile(CRowProcessRunner.scala:35) > at > org.apache.flink.table.runtime.CRowProcessRunner.open(CRowProcessRunner.scala:49) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > at > org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at > java.lang.Thread.run(Thread.java:748) Caused by: > org.codehaus.commons.compiler.CompileException: Line 6, Column 18: Cannot > determine simple type name "com" at > org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11672) at > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6416) at > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6177) at > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6190) at > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6190) at > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6190) at > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6190) at > org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6156) at > org.codehaus.janino.UnitCompiler.access$13300(UnitCompiler.java:212) at > org.codehaus.janino.UnitCompiler$18$1.visitReferenceType(UnitCompiler.java:6064) > at > org.codehaus.janino.UnitCompiler$18$1.visitReferenceType(UnitCompiler.java:6059) > at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3754) at > org.codehaus.janino.UnitCompiler$18.visitType(UnitCompiler.java:6059) at > org.codehaus.janino.UnitCompiler$18.visitType(UnitCompiler.java:6052) at > org.codehaus.janino.Java$ReferenceType.accept(Java.java:3753) at > org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6052) at > org.codehaus.janino.UnitCompiler.access$1200(UnitCompiler.java:212) at > org.codehaus.janino.UnitCompiler$21.getType(UnitCompiler.java:7844) at > org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6456) at > org.codehaus.janino.UnitCompiler.access$13800(UnitCompiler.java:212) at > org.codehaus.janino.UnitCompiler$18$2$1.visitFieldAccess(UnitCompiler.java:6082) > at > org.codehaus.janino.UnitCompiler$18$2$1.visitFieldAccess(UnitCompiler.java:6077) > at org.codehaus.janino.Java$FieldAccess.accept(Java.java:4136) at > org.codehaus.janino.UnitCompiler$18$2.visitLvalue(UnitCompiler.java:6077) at > org.codehaus.janino.UnitCompiler$18$2.visitLvalue(UnitCompiler.java:6073) at > org.codehaus.janino.Java$Lvalue.accept(Java.java:3974) at > org.codehaus.janino.UnitCompiler$18.visitRvalue(UnitCompiler.java:6073) at > org.codehaus.janino.UnitCompiler$18.visitRvalue(UnitCompiler.java:6052) at > org.codehaus.janino.Java$Rvalue.accept(Java.java:3942) at > org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6052) at > org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6438) at > org.codehaus.janino.UnitCompiler.access$13600(UnitCompiler.java:212) at > org.codehaus.janino.UnitCompiler$18$2$1.visitAmbiguousName(UnitCompiler.java:6080) > at > org.codehaus.janino.UnitCompiler$18$2$1.visitAmbiguousName(UnitCompiler.java:6077) > at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:4050) at > org.codehaus.janino.UnitCompiler$18$2.visitLvalue(UnitCompiler.java:6077) at > org.codehaus.janino.UnitCompiler$18$2.visitLvalue(UnitCompiler.java:6073) at >
[jira] [Updated] (FLINK-9220) Table program cannot be compiled
[ https://issues.apache.org/jira/browse/FLINK-9220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Saurabh Garg updated FLINK-9220: Description: Flink job gets failed with scalar UDF. I am using Flink 1.4. Issue came with Scalar UDF Below is the error logs: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue. at org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36) at org.apache.flink.table.runtime.CRowProcessRunner.compile(CRowProcessRunner.scala:35) at org.apache.flink.table.runtime.CRowProcessRunner.open(CRowProcessRunner.scala:49) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:748) Caused by: org.codehaus.commons.compiler.CompileException: Line 6, Column 18: Cannot determine simple type name "com" at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11672) at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6416) at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6177) at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6190) at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6190) at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6190) at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6190) at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6156) at org.codehaus.janino.UnitCompiler.access$13300(UnitCompiler.java:212) at org.codehaus.janino.UnitCompiler$18$1.visitReferenceType(UnitCompiler.java:6064) at org.codehaus.janino.UnitCompiler$18$1.visitReferenceType(UnitCompiler.java:6059) at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3754) at org.codehaus.janino.UnitCompiler$18.visitType(UnitCompiler.java:6059) at org.codehaus.janino.UnitCompiler$18.visitType(UnitCompiler.java:6052) at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3753) at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6052) at org.codehaus.janino.UnitCompiler.access$1200(UnitCompiler.java:212) at org.codehaus.janino.UnitCompiler$21.getType(UnitCompiler.java:7844) at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6456) at org.codehaus.janino.UnitCompiler.access$13800(UnitCompiler.java:212) at org.codehaus.janino.UnitCompiler$18$2$1.visitFieldAccess(UnitCompiler.java:6082) at org.codehaus.janino.UnitCompiler$18$2$1.visitFieldAccess(UnitCompiler.java:6077) at org.codehaus.janino.Java$FieldAccess.accept(Java.java:4136) at org.codehaus.janino.UnitCompiler$18$2.visitLvalue(UnitCompiler.java:6077) at org.codehaus.janino.UnitCompiler$18$2.visitLvalue(UnitCompiler.java:6073) at org.codehaus.janino.Java$Lvalue.accept(Java.java:3974) at org.codehaus.janino.UnitCompiler$18.visitRvalue(UnitCompiler.java:6073) at org.codehaus.janino.UnitCompiler$18.visitRvalue(UnitCompiler.java:6052) at org.codehaus.janino.Java$Rvalue.accept(Java.java:3942) at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6052) at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6438) at org.codehaus.janino.UnitCompiler.access$13600(UnitCompiler.java:212) at org.codehaus.janino.UnitCompiler$18$2$1.visitAmbiguousName(UnitCompiler.java:6080) at org.codehaus.janino.UnitCompiler$18$2$1.visitAmbiguousName(UnitCompiler.java:6077) at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:4050) at org.codehaus.janino.UnitCompiler$18$2.visitLvalue(UnitCompiler.java:6077) at org.codehaus.janino.UnitCompiler$18$2.visitLvalue(UnitCompiler.java:6073) at org.codehaus.janino.Java$Lvalue.accept(Java.java:3974) at org.codehaus.janino.UnitCompiler$18.visitRvalue(UnitCompiler.java:6073) at org.codehaus.janino.UnitCompiler$18.visitRvalue(UnitCompiler.java:6052) at org.codehaus.janino.Java$Rvalue.accept(Java.java:3942) at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6052) at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8591) at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:4708) at org.codehaus.janino.UnitCompiler.access$8200(UnitCompiler.java:212) at org.codehaus.janino.UnitCompiler$12.visitMethodInvocation(UnitCompiler.java:4071) at org.codehaus.janino.UnitCompiler$12.visitMethodInvocation(UnitCompiler.java:4044) at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:4874) at
[jira] [Commented] (FLINK-8836) Duplicating a KryoSerializer does not duplicate registered default serializers
[ https://issues.apache.org/jira/browse/FLINK-8836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444068#comment-16444068 ] ASF GitHub Bot commented on FLINK-8836: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5880#discussion_r182750453 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java --- @@ -140,14 +140,37 @@ public KryoSerializer(Class type, ExecutionConfig executionConfig){ * Copy-constructor that does not copy transient fields. They will be initialized once required. */ protected KryoSerializer(KryoSerializer toCopy) { - defaultSerializers = toCopy.defaultSerializers; - defaultSerializerClasses = toCopy.defaultSerializerClasses; - kryoRegistrations = toCopy.kryoRegistrations; + this.type = checkNotNull(toCopy.type, "Type class cannot be null."); + this.defaultSerializerClasses = toCopy.defaultSerializerClasses; + this.defaultSerializers = new LinkedHashMap<>(toCopy.defaultSerializers.size()); + this.kryoRegistrations = new LinkedHashMap<>(toCopy.kryoRegistrations.size()); + + // deep copy the serializer instances in defaultSerializers + for (Map.Entryentry : + toCopy.defaultSerializers.entrySet()) { - type = toCopy.type; - if(type == null){ - throw new NullPointerException("Type class cannot be null."); + this.defaultSerializers.put(entry.getKey(), deepCopySerializer(entry.getValue())); + } + + // deep copy the serializer instances in kryoRegistrations + for (Map.Entry entry : toCopy.kryoRegistrations.entrySet()) { --- End diff -- One alternative approach to this loop (though I'm not sure would be better), is in the `buildKryoRegistrationsMethod` we always make a copy of the `ExecutionConfig.SerializableSerializer` when instantiating its corresponding `KryoRegistration`. See https://github.com/apache/flink/blob/be7c89596a3b9cd8805a90aaf32336ec2759a1f7/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java#L537. Here we can make a copy already when building the registrations. Then, when duplicating the `KryoSerializer`, for duplicating the registrations, this would only be a matter of calling `buildKryoRegistrations` again with the execution config because that method would handle stateful serializer registrations properly. IMO, this seems like a cleaner solution. What do you think? > Duplicating a KryoSerializer does not duplicate registered default serializers > -- > > Key: FLINK-8836 > URL: https://issues.apache.org/jira/browse/FLINK-8836 > Project: Flink > Issue Type: Bug > Components: Type Serialization System >Reporter: Tzu-Li (Gordon) Tai >Assignee: Stefan Richter >Priority: Blocker > Fix For: 1.5.0 > > > The {{duplicate()}} method of the {{KryoSerializer}} is as following: > {code:java} > public KryoSerializer duplicate() { > return new KryoSerializer<>(this); > } > protected KryoSerializer(KryoSerializer toCopy) { > defaultSerializers = toCopy.defaultSerializers; > defaultSerializerClasses = toCopy.defaultSerializerClasses; > kryoRegistrations = toCopy.kryoRegistrations; > ... > } > {code} > Shortly put, when duplicating a {{KryoSerializer}}, the > {{defaultSerializers}} serializer instances are directly provided to the new > {{KryoSerializer}} instance. > This causes the fact that those default serializers are shared across two > different {{KryoSerializer}} instances, and therefore not a correct duplicate. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5880: [FLINK-8836] Fix duplicate method in KryoSerialize...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5880#discussion_r182750453 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java --- @@ -140,14 +140,37 @@ public KryoSerializer(Class type, ExecutionConfig executionConfig){ * Copy-constructor that does not copy transient fields. They will be initialized once required. */ protected KryoSerializer(KryoSerializer toCopy) { - defaultSerializers = toCopy.defaultSerializers; - defaultSerializerClasses = toCopy.defaultSerializerClasses; - kryoRegistrations = toCopy.kryoRegistrations; + this.type = checkNotNull(toCopy.type, "Type class cannot be null."); + this.defaultSerializerClasses = toCopy.defaultSerializerClasses; + this.defaultSerializers = new LinkedHashMap<>(toCopy.defaultSerializers.size()); + this.kryoRegistrations = new LinkedHashMap<>(toCopy.kryoRegistrations.size()); + + // deep copy the serializer instances in defaultSerializers + for (Map.Entryentry : + toCopy.defaultSerializers.entrySet()) { - type = toCopy.type; - if(type == null){ - throw new NullPointerException("Type class cannot be null."); + this.defaultSerializers.put(entry.getKey(), deepCopySerializer(entry.getValue())); + } + + // deep copy the serializer instances in kryoRegistrations + for (Map.Entry entry : toCopy.kryoRegistrations.entrySet()) { --- End diff -- One alternative approach to this loop (though I'm not sure would be better), is in the `buildKryoRegistrationsMethod` we always make a copy of the `ExecutionConfig.SerializableSerializer` when instantiating its corresponding `KryoRegistration`. See https://github.com/apache/flink/blob/be7c89596a3b9cd8805a90aaf32336ec2759a1f7/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java#L537. Here we can make a copy already when building the registrations. Then, when duplicating the `KryoSerializer`, for duplicating the registrations, this would only be a matter of calling `buildKryoRegistrations` again with the execution config because that method would handle stateful serializer registrations properly. IMO, this seems like a cleaner solution. What do you think? ---
[jira] [Commented] (FLINK-8836) Duplicating a KryoSerializer does not duplicate registered default serializers
[ https://issues.apache.org/jira/browse/FLINK-8836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444066#comment-16444066 ] ASF GitHub Bot commented on FLINK-8836: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5880#discussion_r182749960 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java --- @@ -140,14 +140,37 @@ public KryoSerializer(Class type, ExecutionConfig executionConfig){ * Copy-constructor that does not copy transient fields. They will be initialized once required. */ protected KryoSerializer(KryoSerializer toCopy) { - defaultSerializers = toCopy.defaultSerializers; - defaultSerializerClasses = toCopy.defaultSerializerClasses; - kryoRegistrations = toCopy.kryoRegistrations; + this.type = checkNotNull(toCopy.type, "Type class cannot be null."); + this.defaultSerializerClasses = toCopy.defaultSerializerClasses; + this.defaultSerializers = new LinkedHashMap<>(toCopy.defaultSerializers.size()); + this.kryoRegistrations = new LinkedHashMap<>(toCopy.kryoRegistrations.size()); + + // deep copy the serializer instances in defaultSerializers + for (Map.Entryentry : + toCopy.defaultSerializers.entrySet()) { - type = toCopy.type; - if(type == null){ - throw new NullPointerException("Type class cannot be null."); + this.defaultSerializers.put(entry.getKey(), deepCopySerializer(entry.getValue())); + } + + // deep copy the serializer instances in kryoRegistrations + for (Map.Entry entry : toCopy.kryoRegistrations.entrySet()) { --- End diff -- One alternative approach to this loop (though I'm not sure would be better), is in the `buildKryoRegistrationsMethod` we always make a copy of the `ExecutionConfig.SerializableSerializer` when instantiating its corresponding `KryoRegistration`. See https://github.com/apache/flink/blob/be7c89596a3b9cd8805a90aaf32336ec2759a1f7/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java#L537. Here we can make a copy already when building the registrations. Then, when duplicating the `KryoSerializer`, for duplicating the registrations, this would only be a matter of calling `buildKryoRegistrations` again from the execution config because that method would handle stateful serializer registrations properly. IMO, this seems like a cleaner solution. What do you think? > Duplicating a KryoSerializer does not duplicate registered default serializers > -- > > Key: FLINK-8836 > URL: https://issues.apache.org/jira/browse/FLINK-8836 > Project: Flink > Issue Type: Bug > Components: Type Serialization System >Reporter: Tzu-Li (Gordon) Tai >Assignee: Stefan Richter >Priority: Blocker > Fix For: 1.5.0 > > > The {{duplicate()}} method of the {{KryoSerializer}} is as following: > {code:java} > public KryoSerializer duplicate() { > return new KryoSerializer<>(this); > } > protected KryoSerializer(KryoSerializer toCopy) { > defaultSerializers = toCopy.defaultSerializers; > defaultSerializerClasses = toCopy.defaultSerializerClasses; > kryoRegistrations = toCopy.kryoRegistrations; > ... > } > {code} > Shortly put, when duplicating a {{KryoSerializer}}, the > {{defaultSerializers}} serializer instances are directly provided to the new > {{KryoSerializer}} instance. > This causes the fact that those default serializers are shared across two > different {{KryoSerializer}} instances, and therefore not a correct duplicate. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5880: [FLINK-8836] Fix duplicate method in KryoSerialize...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5880#discussion_r182749960 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java --- @@ -140,14 +140,37 @@ public KryoSerializer(Class type, ExecutionConfig executionConfig){ * Copy-constructor that does not copy transient fields. They will be initialized once required. */ protected KryoSerializer(KryoSerializer toCopy) { - defaultSerializers = toCopy.defaultSerializers; - defaultSerializerClasses = toCopy.defaultSerializerClasses; - kryoRegistrations = toCopy.kryoRegistrations; + this.type = checkNotNull(toCopy.type, "Type class cannot be null."); + this.defaultSerializerClasses = toCopy.defaultSerializerClasses; + this.defaultSerializers = new LinkedHashMap<>(toCopy.defaultSerializers.size()); + this.kryoRegistrations = new LinkedHashMap<>(toCopy.kryoRegistrations.size()); + + // deep copy the serializer instances in defaultSerializers + for (Map.Entryentry : + toCopy.defaultSerializers.entrySet()) { - type = toCopy.type; - if(type == null){ - throw new NullPointerException("Type class cannot be null."); + this.defaultSerializers.put(entry.getKey(), deepCopySerializer(entry.getValue())); + } + + // deep copy the serializer instances in kryoRegistrations + for (Map.Entry entry : toCopy.kryoRegistrations.entrySet()) { --- End diff -- One alternative approach to this loop (though I'm not sure would be better), is in the `buildKryoRegistrationsMethod` we always make a copy of the `ExecutionConfig.SerializableSerializer` when instantiating its corresponding `KryoRegistration`. See https://github.com/apache/flink/blob/be7c89596a3b9cd8805a90aaf32336ec2759a1f7/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java#L537. Here we can make a copy already when building the registrations. Then, when duplicating the `KryoSerializer`, for duplicating the registrations, this would only be a matter of calling `buildKryoRegistrations` again from the execution config because that method would handle stateful serializer registrations properly. IMO, this seems like a cleaner solution. What do you think? ---
[jira] [Commented] (FLINK-8785) JobSubmitHandler does not handle JobSubmissionExceptions
[ https://issues.apache.org/jira/browse/FLINK-8785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444055#comment-16444055 ] ASF GitHub Bot commented on FLINK-8785: --- Github user buptljy commented on the issue: https://github.com/apache/flink/pull/5877 @zentol ok, I thought that it was a small change before, so you mean we can make more changes on the messages of the exceptions and let them reported more properly ? I will test all the cases in the JobSubmissionFailsITCase. > JobSubmitHandler does not handle JobSubmissionExceptions > > > Key: FLINK-8785 > URL: https://issues.apache.org/jira/browse/FLINK-8785 > Project: Flink > Issue Type: Bug > Components: Job-Submission, JobManager, REST >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Assignee: buptljy >Priority: Critical > Labels: flip-6 > > If the job submission, i.e. {{DispatcherGateway#submitJob}} fails with a > {{JobSubmissionException}} the {{JobSubmissionHandler}} returns "Internal > server error" instead of signaling the failed job submission. > This can for example occur if the transmitted execution graph is faulty, as > tested by the \{{JobSubmissionFailsITCase}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5877: [FLINK-8785][Job-Submission]Handle JobSubmissionException...
Github user buptljy commented on the issue: https://github.com/apache/flink/pull/5877 @zentol ok, I thought that it was a small change before, so you mean we can make more changes on the messages of the exceptions and let them reported more properly ? I will test all the cases in the JobSubmissionFailsITCase. ---
[GitHub] flink issue #5813: [FLINK-8980] [e2e] Add a BucketingSink end-to-end test
Github user kl0u commented on the issue: https://github.com/apache/flink/pull/5813 Sounds good @twalthr ! ---
[jira] [Commented] (FLINK-8980) End-to-end test: BucketingSink
[ https://issues.apache.org/jira/browse/FLINK-8980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16444044#comment-16444044 ] ASF GitHub Bot commented on FLINK-8980: --- Github user kl0u commented on the issue: https://github.com/apache/flink/pull/5813 Sounds good @twalthr ! > End-to-end test: BucketingSink > -- > > Key: FLINK-8980 > URL: https://issues.apache.org/jira/browse/FLINK-8980 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Timo Walther >Priority: Blocker > Fix For: 1.5.0 > > > In order to verify the {{BucketingSink}}, we should add an end-to-end test > which verifies that the {{BucketingSink}} does not lose data under failures. > An idea would be to have a CountUp job which simply counts up a counter which > is persisted. The emitted values will be written to disk by the > {{BucketingSink}}. Now we should kill randomly Flink processes (cluster > entrypoint and TaskExecutors) to simulate failures. Even after these > failures, the written files should contain the correct sequence of numbers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)