[jira] [Commented] (FLINK-35215) The performance of serializerKryo and serializerKryoWithoutRegistration are regressed

2024-05-08 Thread Kenneth William Krugler (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17844662#comment-17844662
 ] 

Kenneth William Krugler commented on FLINK-35215:
-

Hi [~fanrui] - I added a comment to the PR.

> The performance of serializerKryo and serializerKryoWithoutRegistration are 
> regressed
> -
>
> Key: FLINK-35215
> URL: https://issues.apache.org/jira/browse/FLINK-35215
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 1.20.0
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Blocker
>  Labels: pull-request-available
> Attachments: image-2024-04-25-14-57-55-231.png, 
> image-2024-04-25-15-00-32-410.png
>
>
> The performance of serializerKryo and serializerKryoWithoutRegistration are 
> regressed[1][2], I checked recent commits, and found FLINK-34954 changed 
> related logic.
>  
> [1] 
> [http://flink-speed.xyz/timeline/#/?exe=1,6,12=serializerKryo=on=on=off=3=50]
> [2] 
> http://flink-speed.xyz/timeline/#/?exe=1,6,12=serializerKryoWithoutRegistration=on=on=off=3=50
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35215) The performance of serializerKryo and serializerKryoWithoutRegistration are regressed

2024-04-27 Thread Kenneth William Krugler (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841547#comment-17841547
 ] 

Kenneth William Krugler commented on FLINK-35215:
-

Hi [~fanrui] - thanks for the detailed writeup! Two comments...

# I was surprised that Case2 (above) didn't cause a test to fail, until I 
realized that the previous fix hadn't added a test for failure with 0 
serialized bytes. I would recommend adding this, so that your changes don't 
accidentally re-introduce this bug.
# I still am suspicious of the change in performance. The initial fix changes a 
while(true) loop to one that has a simple comparison, and inside the loop there 
are calls to methods that are going to be doing significant work. So I really 
don't see how that change could have caused a significant performance 
regression, unless I'm missing something.

> The performance of serializerKryo and serializerKryoWithoutRegistration are 
> regressed
> -
>
> Key: FLINK-35215
> URL: https://issues.apache.org/jira/browse/FLINK-35215
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 1.20.0
>Reporter: Rui Fan
>Priority: Blocker
>  Labels: pull-request-available
> Attachments: image-2024-04-25-14-57-55-231.png, 
> image-2024-04-25-15-00-32-410.png
>
>
> The performance of serializerKryo and serializerKryoWithoutRegistration are 
> regressed[1][2], I checked recent commits, and found FLINK-34954 changed 
> related logic.
>  
> [1] 
> [http://flink-speed.xyz/timeline/#/?exe=1,6,12=serializerKryo=on=on=off=3=50]
> [2] 
> http://flink-speed.xyz/timeline/#/?exe=1,6,12=serializerKryoWithoutRegistration=on=on=off=3=50
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35215) The performance of serializerKryo and serializerKryoWithoutRegistration are regressed

2024-04-23 Thread Kenneth William Krugler (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17840259#comment-17840259
 ] 

Kenneth William Krugler commented on FLINK-35215:
-

Hi [~fanrui] - I haven't been following how Flink determines the severity & 
nature of performance regressions, but labeling this a Blocker Bug seems...odd 
to me. The speedtest I looked at showed a delta that was close to normal 
fluctuations in performance results.

> The performance of serializerKryo and serializerKryoWithoutRegistration are 
> regressed
> -
>
> Key: FLINK-35215
> URL: https://issues.apache.org/jira/browse/FLINK-35215
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 1.20.0
>Reporter: Rui Fan
>Priority: Blocker
>
> The performance of serializerKryo and serializerKryoWithoutRegistration are 
> regressed[1][2], I checked recent commits, and found FLINK-34954 changed 
> related logic.
>  
> [1] 
> [http://flink-speed.xyz/timeline/#/?exe=1,6,12=serializerKryo=on=on=off=3=50]
> [2] 
> http://flink-speed.xyz/timeline/#/?exe=1,6,12=serializerKryoWithoutRegistration=on=on=off=3=50
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35076) Watermark alignment will cause data flow to experience serious shake

2024-04-11 Thread Kenneth William Krugler (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836325#comment-17836325
 ] 

Kenneth William Krugler commented on FLINK-35076:
-

Hi [~elon] - please post these questions about the impact of idleness and how 
to rebalance on Stack Overflow, or the Flink user list. That way the Q can 
benefit the entire community, thanks!

> Watermark alignment will cause data flow to experience serious shake
> 
>
> Key: FLINK-35076
> URL: https://issues.apache.org/jira/browse/FLINK-35076
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.16.1
>Reporter: elon_X
>Priority: Major
> Attachments: image-2024-04-10-20-15-05-731.png, 
> image-2024-04-10-20-23-13-872.png, image-2024-04-10-20-25-59-387.png, 
> image-2024-04-10-20-29-13-835.png
>
>
> In our company, there is a requirement scenario for multi-stream join 
> operations, we are making modifications based on Flink watermark alignment, 
> then I found that the final join output would experience serious shake.
> and I analyzed the reasons: an upstream topic has more than 300 partitions. 
> The number of partitions requested for this topic is too large, causing some 
> partitions to frequently experience intermittent writes with QPS=0. This 
> phenomenon is more serious between 2 am and 5 am.However, the overall topic 
> writing is very smooth.
> !image-2024-04-10-20-29-13-835.png!
> The final join output will experience serious shake, as shown in the 
> following diagram:
> !image-2024-04-10-20-15-05-731.png!
> Root cause:
>  # The {{SourceOperator#emitLatestWatermark}} reports the 
> lastEmittedWatermark to the SourceCoordinator.
>  # If the partition write is zero during a certain period, the 
> lastEmittedWatermark sent by the subtask corresponding to that partition 
> remains unchanged.
>  # The SourceCoordinator aggregates the watermarks of all subtasks according 
> to the watermark group and takes the smallest watermark. This means that the 
> maxAllowedWatermark may remain unchanged for some time, even though the 
> overall upstream data flow is moving forward. until that minimum value is 
> updated, only then will everything change, which will manifest as serious 
> shake in the output data stream.
> I think choosing the global minimum might not be a good option. Using min/max 
> could more likely encounter some edge cases. Perhaps choosing a median value 
> would be more appropriate? Or a more complex selection strategy?
> If replaced with a median value, it can ensure that the overall data flow is 
> very smooth:
> !image-2024-04-10-20-23-13-872.png!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35076) Watermark alignment will cause data flow to experience serious shake

2024-04-10 Thread Kenneth William Krugler (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17835871#comment-17835871
 ] 

Kenneth William Krugler commented on FLINK-35076:
-

There are logical reasons why picking the minimum value is a requirement. In 
your situation, you could either set up the watermark strategy for your Kafka 
source to have a "[max 
idleness|https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources];,
 or you could shuffle the stream (via a rebalance()), which would avoid the 
problem of an idle partition.

> Watermark alignment will cause data flow to experience serious shake
> 
>
> Key: FLINK-35076
> URL: https://issues.apache.org/jira/browse/FLINK-35076
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.16.1
>Reporter: elon_X
>Priority: Major
> Attachments: image-2024-04-10-20-15-05-731.png, 
> image-2024-04-10-20-23-13-872.png, image-2024-04-10-20-25-59-387.png, 
> image-2024-04-10-20-29-13-835.png
>
>
> In our company, there is a requirement scenario for multi-stream join 
> operations, we are making modifications based on Flink watermark alignment, 
> then I found that the final join output would experience serious shake.
> and I analyzed the reasons: an upstream topic has more than 300 partitions. 
> The number of partitions requested for this topic is too large, causing some 
> partitions to frequently experience intermittent writes with QPS=0. This 
> phenomenon is more serious between 2 am and 5 am.However, the overall topic 
> writing is very smooth.
> !image-2024-04-10-20-29-13-835.png!
> The final join output will experience serious shake, as shown in the 
> following diagram:
> !image-2024-04-10-20-15-05-731.png!
> Root cause:
>  # The {{SourceOperator#emitLatestWatermark}} reports the 
> lastEmittedWatermark to the SourceCoordinator.
>  # If the partition write is zero during a certain period, the 
> lastEmittedWatermark sent by the subtask corresponding to that partition 
> remains unchanged.
>  # The SourceCoordinator aggregates the watermarks of all subtasks according 
> to the watermark group and takes the smallest watermark. This means that the 
> maxAllowedWatermark may remain unchanged for some time, even though the 
> overall upstream data flow is moving forward. until that minimum value is 
> updated, only then will everything change, which will manifest as serious 
> shake in the output data stream.
> I think choosing the global minimum might not be a good option. Using min/max 
> could more likely encounter some edge cases. Perhaps choosing a median value 
> would be more appropriate? Or a more complex selection strategy?
> If replaced with a median value, it can ensure that the overall data flow is 
> very smooth:
> !image-2024-04-10-20-23-13-872.png!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34975) FLIP-427: ForSt - Disaggregated state Store

2024-04-01 Thread Kenneth William Krugler (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17832917#comment-17832917
 ] 

Kenneth William Krugler commented on FLINK-34975:
-

I'd previously explored using [AnyBlob|https://github.com/durner/AnyBlob] for 
maintaining state in S3, though I stopped when I ran into C++/JNI build issues 
on my Mac (M1 processor-related). AnyBlob is C++, so not sure it would be worth 
the effort, but I think they use some standard remote storage optimization 
techniques that could be interesting.

> FLIP-427: ForSt - Disaggregated state Store
> ---
>
> Key: FLINK-34975
> URL: https://issues.apache.org/jira/browse/FLINK-34975
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / State Backends
>Reporter: Hangxiang Yu
>Assignee: Hangxiang Yu
>Priority: Major
> Fix For: 2.0.0
>
>
> This is a sub-FLIP for the disaggregated state management and its related 
> work, please read the [FLIP-423|https://cwiki.apache.org/confluence/x/R4p3EQ] 
> first to know the whole story.
> As described in FLIP-423, there are some tough issues about embedded state 
> backend on local file system, respecially when dealing with extremely large 
> state:
>  # {*}Constraints of local disk space complicate the prediction of storage 
> requirements, potentially leading to job failures{*}: Especially in cloud 
> native deployment mode, pre-allocated local disks typically face strict 
> capacity constraints, making it challenging to forecast the size requirements 
> of job states. Over-provisioning disk space results in unnecessary resource 
> overhead, while under-provisioning risks job failure due to insufficient 
> space.
>  # *The tight coupling of compute and storage resources leads to 
> underutilization and increased waste:* Jobs can generally be categorized as 
> either CPU-intensive or IO-intensive. In a coupled architecture, 
> CPU-intensive jobs leave a significant portion of storage resources 
> underutilized, whereas IO-intensive jobs result in idle computing resources.
> By considering remote storage as the primary storage, all working states are 
> maintained on the remote file system, which brings several advantages:
>  # *Remote storages e.g. S3/HDFS typically offer elastic scalability, 
> theoretically providing unlimited space.*
>  # *The allocation of remote storage resources can be optimized by reducing 
> them for CPU-intensive jobs and augmenting them for IO-intensive jobs, thus 
> enhancing overall resource utilization.*
>  # *This architecture facilitates a highly efficient and lightweight process 
> for checkpointing, recovery, and rescaling through fast copy or simple move.*
> This FLIP aims to realize disaggregated state for our new key-value store 
> named *ForSt* which evloves from RocksDB and supports remote file system. 
> This makes Flink get rid of the disadvantages by coupled state architecture 
> and embrace the scalable as well as flexible cloud-native storage.
> Please see [FLIP-427 |https://cwiki.apache.org/confluence/x/T4p3EQ]for more 
> details.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30112) Improve docs re state compression

2023-11-16 Thread Kenneth William Krugler (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17786890#comment-17786890
 ] 

Kenneth William Krugler commented on FLINK-30112:
-

As of https://issues.apache.org/jira/browse/FLINK-30113, I think the docs 
should now be updated to note that both keyed and operator state compression is 
supported.

It would be good to clarify that `execution.checkpointing.snapshot-compression` 
is used for both keyed and operator state compression.

And that there's a change to snapshot format which isn't backwards compatible 
(but is forward compatible), or so I assume?

> Improve docs re state compression
> -
>
> Key: FLINK-30112
> URL: https://issues.apache.org/jira/browse/FLINK-30112
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Dawid Wysakowicz
>Priority: Major
> Fix For: 1.19.0
>
>
> Documentation should state explicitly state compression is supported only for 
> KeyedState as of now.
> https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/state/large_state_tuning/#compression



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28653) State Schema Evolution does not work - Flink defaults to Kryo serialization even for POJOs and Avro SpecificRecords

2022-07-24 Thread Kenneth William Krugler (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17570567#comment-17570567
 ] 

Kenneth William Krugler commented on FLINK-28653:
-

What happens if you explicitly define the no-args constructor and 
getters/setters for User.java, without relying on Lombok?

> State Schema Evolution does not work - Flink defaults to Kryo serialization 
> even for POJOs and Avro SpecificRecords
> ---
>
> Key: FLINK-28653
> URL: https://issues.apache.org/jira/browse/FLINK-28653
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System, Runtime / State Backends
>Affects Versions: 1.14.3, 1.15.0
> Environment: I ran the job on a Flink cluster I spun up using docker 
> compose:
> ```
> version: "2.2"
> services:
>   jobmanager:
>     image: flink:latest
>     ports:
>       - "8081:8081"
>     command: jobmanager
>     environment:
>       - |
>         FLINK_PROPERTIES=
>         jobmanager.rpc.address: jobmanager
>   taskmanager:
>     image: flink:latest
>     depends_on:
>       - jobmanager
>     command: taskmanager
>     scale: 1
>     environment:
>       - |
>         FLINK_PROPERTIES=
>         jobmanager.rpc.address: jobmanager
>         taskmanager.numberOfTaskSlots: 2
> ```
>  My machine is a MacBook Pro (14-inch, 2021) with the Apple M1 Pro chip.
> I'm running macOS Monterey Version 12.4.
>Reporter: Peleg Tsadok
>Priority: Major
>  Labels: KryoSerializer, State, avro, pojo, schema-evolution
>
> I am trying to do a POC of Flink State Schema Evolution. I am using Flink 
> 1.15.0 and Java 11 but also tested on Flink 1.14.3.
> I tried to create 3 data classes - one for each serialization type:
> 1. `io.peleg.kryo.User` - Uses `java.time.Instant` class which I know is not 
> supported for POJO serialization in Flink.
> 2. `io.peleg.pojo.User` - Uses only classic wrapped primitives - `Integer`, 
> `Long`, `String`. The getters, setters and constructors are generated using 
> Lombok.
> 3. `io.peleg.avro.User` - Generated from Avro schema using Avro Maven Plugin.
> For each class I wrote a stream job that uses a time window to buffer 
> elements and turn them into a list.
> For each class I tried to do the following:
> 1. Run a job
> 2. Stop with savepoint
> 3. Add a field to the data class
> 4. Submit using savepoint
> For all data classes the submit with savepoint failed with this exception:
> {code:java}
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>     at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:255)
>     at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268)
>     at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>     at java.base/java.lang.Thread.run(Unknown Source)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
> state backend for WindowOperator_3983d6bb2f0a45b638461bc99138f22f_(2/2) from 
> any of the 1 provided restore options.
>     at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
>     at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346)
>     at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164)
>     ... 11 more
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed 
> when trying to restore heap backend
>     at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:172)
>     at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:106)
>  

[jira] [Commented] (FLINK-21548) keyBy operation produces skewed record distribution for low-cardinality keys

2021-05-01 Thread Kenneth William Krugler (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17337814#comment-17337814
 ] 

Kenneth William Krugler commented on FLINK-21548:
-

Hi [~izeigerman] - it would be good if you closed this issue as "Not a bug". If 
you're still looking for input from the community on how best to solve this 
issue, I'd suggest asking the question on the Flink user mailing list, thanks.

> keyBy operation produces skewed record distribution for low-cardinality keys
> 
>
> Key: FLINK-21548
> URL: https://issues.apache.org/jira/browse/FLINK-21548
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Runtime / Task
>Affects Versions: 1.11.0, 1.12.1
>Reporter: Iaroslav Zeigerman
>Priority: Minor
>  Labels: auto-deprioritized-major
> Attachments: Screen Shot 2021-03-01 at 10.52.31 AM.png, Screen Shot 
> 2021-03-01 at 10.54.42 AM.png, Screen Shot 2021-03-01 at 10.57.33 AM.png
>
>
> When the cardinality of keys matches the existing parallelism not all 
> downstream tasks are utilized in the downstream operator. Even those that are 
> utilized are not utilized evenly.
> For example if I have 500 unique keys [0, 500) only 313 downstream tasks (out 
> of 500) will receive any records at all. 
> *NOTE*: for all examples below 1 million record instances were used.
> This behavior can easily be reproduced with the following test case:
> {code:scala}
> import org.apache.flink.runtime.state.KeyGroupRangeAssignment
> object Test {
>   val parallelism = 500
>   val recordsNum  = 100
>   def run(): Unit = {
> val recordIds = (0 to recordsNum).map(_ % parallelism)
> val tasks = recordIds.map(selectTask)
> println(s"Total unique keys: ${recordIds.toSet.size}")
> println(s"Key distribution: 
> ${recordIds.groupBy(identity).mapValues(_.size).toVector.sortBy(-_._2)}")
> println("===")
> println(s"Tasks involved: ${tasks.toSet.size}")
> println(s"Record distribution by task: 
> ${tasks.groupBy(identity).mapValues(_.size).toVector.sortBy(-_._2)}")
>   }
>   def selectTask(key: Int): Int =
> KeyGroupRangeAssignment.assignToKeyGroup(
>   key,
>   parallelism
> )
> }
> {code}
> Which produces the following results:
> {noformat}
> Total unique keys: 500
> Key distribution: Vector((0,2001), (69,2000), ..., (232,2000), (100,2000))
> ===
> Tasks involved: 313
> Record distribution by task: Vector((147,1), (248,1), ..., 
> (232,2000), (100,2000))
> {noformat}
> Record distribution visualized:
>  !Screen Shot 2021-03-01 at 10.52.31 AM.png!
> I have determined that in order to achieve the utilization of all tasks the 
> number of unique keys should be at least 5 times of the parallelism value. 
> The relation between number of unique keys and a fraction of utilized tasks 
> appears to be exponential:
>  !Screen Shot 2021-03-01 at 10.54.42 AM.png!  
> But with 5x number of keys the skew is still quite significant:
> !Screen Shot 2021-03-01 at 10.57.33 AM.png!
> Given that keys used in my test are integer values for which `hashCode` 
> returns the value itself I tend to believe that the skew is caused by the 
> Flink's murmur hash implementation which is used 
> [here|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java#L76].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22429) Exclude Sub-Tasks in all bot "stale-unassigned" rule of Jira Bot

2021-04-23 Thread Kenneth William Krugler (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22429?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kenneth William Krugler updated FLINK-22429:

Summary: Exclude Sub-Tasks in all bot "stale-unassigned" rule of Jira Bot  
(was: Exlude Sub-Tasks in all bot "stale-unassigned" rule of Jira Bot)

> Exclude Sub-Tasks in all bot "stale-unassigned" rule of Jira Bot
> 
>
> Key: FLINK-22429
> URL: https://issues.apache.org/jira/browse/FLINK-22429
> Project: Flink
>  Issue Type: Bug
>Reporter: Konstantin Knauf
>Assignee: Konstantin Knauf
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-9541) Add robots.txt and sitemap.xml to Flink website

2021-04-22 Thread Kenneth William Krugler (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-9541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17329462#comment-17329462
 ] 

Kenneth William Krugler commented on FLINK-9541:


[~chesnay] - I'd take on this issue, but I don't think I can assign it to 
myself.

I think it's still important, as the search I'd referenced in my initial 
description still returns, as its #1 hit, the JavaDoc for ReducingState from 
1.3-SNAPSHOT (that's 1.3, not 1.13).

> Add robots.txt and sitemap.xml to Flink website
> ---
>
> Key: FLINK-9541
> URL: https://issues.apache.org/jira/browse/FLINK-9541
> Project: Flink
>  Issue Type: Improvement
>  Components: Project Website
>Reporter: Fabian Hueske
>Priority: Major
>  Labels: stale-major
>
> From the [dev mailing 
> list|https://lists.apache.org/thread.html/71ce1bfbed1cf5f0069b27a46df1cd4dccbe8abefa75ac85601b088b@%3Cdev.flink.apache.org%3E]:
> {quote}
> It would help to add a sitemap (and the robots.txt required to reference it) 
> for flink.apache.org and ci.apache.org (for /projects/flink)
> You can see what Tomcat did along these lines - 
> http://tomcat.apache.org/robots.txt references 
> http://tomcat.apache.org/sitemap.xml, which is a sitemap index file pointing 
> to http://tomcat.apache.org/sitemap-main.xml
> By doing this, you can emphasize more recent versions of docs. There are 
> other benefits, but reducing poor Google search results (to me) is the 
> biggest win.
> E.g.  https://www.google.com/search?q=flink+reducingstate 
>  (search on flink 
> reducing state) shows the 1.3 Javadocs (hit #1), master (1.6-SNAPSHOT) 
> Javadocs (hit #2), and then many pages of other results.
> Whereas the Javadocs for 1.5 
> 
>  and 1.4 
> 
>  are nowhere to be found.
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22139) Flink Jobmanager & Task Manger logs are not writing to the logs files

2021-04-07 Thread Kenneth William Krugler (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17316425#comment-17316425
 ] 

Kenneth William Krugler commented on FLINK-22139:
-

Hi [~bhagi__R] - apologies if I missed it, but I don't remember seeing you ask 
this question on the Flink user mailing list. If you haven't, then that's a 
good first step before filing a Jira issue, as you should hopefully get input 
on whether it's a actual bug or not.

> Flink Jobmanager & Task Manger logs are not writing to the logs files
> -
>
> Key: FLINK-22139
> URL: https://issues.apache.org/jira/browse/FLINK-22139
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / Kubernetes
>Affects Versions: 1.12.2
> Environment: on kubernetes flink standalone deployment with 
> jobmanager HA is enabled.
>Reporter: Bhagi
>Priority: Major
>
> Hi Team,
> I am submitting the jobs and restarting the job manager and task manager 
> pods..  Log files are generating with the name task manager and job manager.
> but job manager & task manager log file size is '0', i am not sure any 
> configuration missed..why logs are not writing to their log files..
> # Task Manager pod###
> flink@flink-taskmanager-85b6585b7-hhgl7:~$ ls -lart log/
> total 0
> -rw-r--r-- 1 flink flink  0 Apr  7 09:35 
> flink--taskexecutor-0-flink-taskmanager-85b6585b7-hhgl7.log
> flink@flink-taskmanager-85b6585b7-hhgl7:~$
> ### Jobmanager pod Logs #
> flink@flink-jobmanager-f6db89b7f-lq4ps:~$
> -rw-r--r-- 1 7148739 flink 0 Apr  7 06:36 
> flink--standalonesession-0-flink-jobmanager-f6db89b7f-gtkx5.log
> -rw-r--r-- 1 7148739 flink 0 Apr  7 06:36 
> flink--standalonesession-0-flink-jobmanager-f6db89b7f-wnrfm.log
> -rw-r--r-- 1 7148739 flink 0 Apr  7 06:37 
> flink--standalonesession-0-flink-jobmanager-f6db89b7f-2b2fs.log
> -rw-r--r-- 1 7148739 flink 0 Apr  7 06:37 
> flink--standalonesession-0-flink-jobmanager-f6db89b7f-7kdhh.log
> -rw-r--r-- 1 7148739 flink 0 Apr  7 09:35 
> flink--standalonesession-0-flink-jobmanager-f6db89b7f-twhkt.log
> drwxrwxrwx 2 7148739 flink35 Apr  7 09:35 .
> -rw-r--r-- 1 7148739 flink 0 Apr  7 09:35 
> flink--standalonesession-0-flink-jobmanager-f6db89b7f-lq4ps.log
> flink@flink-jobmanager-f6db89b7f-lq4ps:~$
> I configured log4j.properties for flink
> log4j.properties: |+
> monitorInterval=30
> rootLogger.level = INFO
> rootLogger.appenderRef.file.ref = MainAppender
> logger.flink.name = org.apache.flink
> logger.flink.level = INFO
> logger.akka.name = akka
> logger.akka.level = INFO
> appender.main.name = MainAppender
> appender.main.type = RollingFile
> appender.main.append = true
> appender.main.fileName = ${sys:log.file}
> appender.main.filePattern = ${sys:log.file}.%i
> appender.main.layout.type = PatternLayout
> appender.main.layout.pattern = %d{-MM-dd HH:mm:ss,SSS} %-5p %-60c %x 
> - %m%n
> appender.main.policies.type = Policies
> appender.main.policies.size.type = SizeBasedTriggeringPolicy
> appender.main.policies.size.size = 100MB
> appender.main.policies.startup.type = OnStartupTriggeringPolicy
> appender.main.strategy.type = DefaultRolloverStrategy
> appender.main.strategy.max = ${env:MAX_LOG_FILE_NUMBER:-10}
> logger.netty.name = 
> org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
> logger.netty.level = OFF



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-21548) keyBy operation produces skewed record distribution for low-cardinality keys

2021-03-01 Thread Kenneth William Krugler (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17293200#comment-17293200
 ] 

Kenneth William Krugler commented on FLINK-21548:
-

Yes, if you generate 1M random values in the range 0...600, you will get a nice 
distribute of values. My example generated the 600 hash codes for the 600 
unique key values. These 600 (or so, might be a few less) unique hash codes 
then get partitioned to 200 slots via modulo math. When that happens, you don't 
get a perfect, uniform distribution. So some slots get 0 hash codes, and other 
slots get 8, 9, 10 or more hash codes. To do better than this requires a 
partitioner with knowledge of the distribution of the hash codes for the unique 
keys, which means some kind of custom "data aware" partitioner.

> keyBy operation produces skewed record distribution for low-cardinality keys
> 
>
> Key: FLINK-21548
> URL: https://issues.apache.org/jira/browse/FLINK-21548
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Runtime / Coordination, Runtime / Task
>Affects Versions: 1.11.0, 1.12.1
>Reporter: Iaroslav Zeigerman
>Priority: Major
> Attachments: Screen Shot 2021-03-01 at 10.52.31 AM.png, Screen Shot 
> 2021-03-01 at 10.54.42 AM.png, Screen Shot 2021-03-01 at 10.57.33 AM.png
>
>
> When the cardinality of keys matches the existing parallelism not all 
> downstream tasks are utilized in the downstream operator. Even those that are 
> utilized are not utilized evenly.
> For example if I have 500 unique keys [0, 500) only 313 downstream tasks (out 
> of 500) will receive any records at all. 
> *NOTE*: for all examples below 1 million record instances were used.
> This behavior can easily be reproduced with the following test case:
> {code:scala}
> import org.apache.flink.runtime.state.KeyGroupRangeAssignment
> object Test {
>   val parallelism = 500
>   val recordsNum  = 100
>   def run(): Unit = {
> val recordIds = (0 to recordsNum).map(_ % parallelism)
> val tasks = recordIds.map(selectTask)
> println(s"Total unique keys: ${recordIds.toSet.size}")
> println(s"Key distribution: 
> ${recordIds.groupBy(identity).mapValues(_.size).toVector.sortBy(-_._2)}")
> println("===")
> println(s"Tasks involved: ${tasks.toSet.size}")
> println(s"Record distribution by task: 
> ${tasks.groupBy(identity).mapValues(_.size).toVector.sortBy(-_._2)}")
>   }
>   def selectTask(key: Int): Int =
> KeyGroupRangeAssignment.assignToKeyGroup(
>   key,
>   parallelism
> )
> }
> {code}
> Which produces the following results:
> {noformat}
> Total unique keys: 500
> Key distribution: Vector((0,2001), (69,2000), ..., (232,2000), (100,2000))
> ===
> Tasks involved: 313
> Record distribution by task: Vector((147,1), (248,1), ..., 
> (232,2000), (100,2000))
> {noformat}
> Record distribution visualized:
>  !Screen Shot 2021-03-01 at 10.52.31 AM.png!
> I have determined that in order to achieve the utilization of all tasks the 
> number of unique keys should be at least 5 times of the parallelism value. 
> The relation between number of unique keys and a fraction of utilized tasks 
> appears to be exponential:
>  !Screen Shot 2021-03-01 at 10.54.42 AM.png!  
> But with 5x number of keys the skew is still quite significant:
> !Screen Shot 2021-03-01 at 10.57.33 AM.png!
> Given that keys used in my test are integer values for which `hashCode` 
> returns the value itself I tend to believe that the skew is caused by the 
> Flink's murmur hash implementation which is used 
> [here|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java#L76].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-21548) keyBy operation produces skewed record distribution for low-cardinality keys

2021-03-01 Thread Kenneth William Krugler (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17293126#comment-17293126
 ] 

Kenneth William Krugler commented on FLINK-21548:
-

Hi [~izeigerman] - I don't believe this is a bug in Flink, but rather the 
expected outcome of random partitioning based on keys. As an exercise, write a 
program to generate say 600 random integers, and (by modulo) place them into 
200 partitions (aka buckets). Perfect partitioning would have 3 values per 
partition, but in reality you get from 0 up to 8, 9, 10 (or more) values per 
partition. The only way to avoid this is to have a data-aware partitioning that 
somehow knows how to assign keys to partitions to avoid the above.

> keyBy operation produces skewed record distribution for low-cardinality keys
> 
>
> Key: FLINK-21548
> URL: https://issues.apache.org/jira/browse/FLINK-21548
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Runtime / Coordination, Runtime / Task
>Affects Versions: 1.11.0, 1.12.1
>Reporter: Iaroslav Zeigerman
>Priority: Major
> Attachments: Screen Shot 2021-03-01 at 10.52.31 AM.png, Screen Shot 
> 2021-03-01 at 10.54.42 AM.png, Screen Shot 2021-03-01 at 10.57.33 AM.png
>
>
> When the cardinality of keys matches the existing parallelism not all 
> downstream tasks are utilized in the downstream operator. Even those that are 
> utilized are not utilized evenly.
> For example if I have 500 unique keys [0, 500) only 313 downstream tasks (out 
> of 500) will receive any records at all. 
> This behavior can easily be reproduced with the following test case:
> {code:scala}
> import org.apache.flink.runtime.state.KeyGroupRangeAssignment
> object Test {
>   val parallelism = 500
>   val recordsNum  = 100
>   def run(): Unit = {
> val recordIds = (0 to recordsNum).map(_ % parallelism)
> val tasks = recordIds.map(selectTask)
> println(s"Total unique keys: ${recordIds.toSet.size}")
> println(s"Key distribution: 
> ${recordIds.groupBy(identity).mapValues(_.size).toVector.sortBy(-_._2)}")
> println("===")
> println(s"Tasks involved: ${tasks.toSet.size}")
> println(s"Record distribution by task: 
> ${tasks.groupBy(identity).mapValues(_.size).toVector.sortBy(-_._2)}")
>   }
>   def selectTask(key: Int): Int =
> KeyGroupRangeAssignment.assignToKeyGroup(
>   key,
>   parallelism
> )
> }
> {code}
> Which produces the following results:
> {noformat}
> Total unique keys: 500
> Key distribution: Vector((0,2001), (69,2000), ..., (232,2000), (100,2000))
> ===
> Tasks involved: 313
> Record distribution by task: Vector((147,1), (248,1), ..., 
> (232,2000), (100,2000))
> {noformat}
> Record distribution visualized:
>  !Screen Shot 2021-03-01 at 10.52.31 AM.png!
> I have determined that in order to achieve the utilization of all tasks the 
> number of unique keys should be at least 5 times of the parallelism value. 
> The relation between number of unique keys and a fraction of utilized tasks 
> appears to be exponential:
>  !Screen Shot 2021-03-01 at 10.54.42 AM.png!  
> But with 5x number of keys the skew is still quite significant:
> !Screen Shot 2021-03-01 at 10.57.33 AM.png!
> Given that keys used in my test are integer values for which `hashCode` 
> returns the value itself I tend to believe that the skew is caused by the 
> Flink's murmur hash implementation which is used 
> [here|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java#L76].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-19794) who wrote the the documents?

2020-10-24 Thread Kenneth William Krugler (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kenneth William Krugler closed FLINK-19794.
---
Resolution: Not A Bug

> who wrote the the documents?
> 
>
> Key: FLINK-19794
> URL: https://issues.apache.org/jira/browse/FLINK-19794
> Project: Flink
>  Issue Type: Test
>Reporter: appleyuchi
>Priority: Major
>
> I'm learning Flink Documents
> There are 
> so much missing information
> and so much typos
> *Could you tell me who wrote the the 
> [documents|https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tableApi.html]
>  are in the mailing list?*
> So I can post question to them directly,such as:
> the mail title "*how to register TableAggregateFunction*"
> in the mailing list.
> Thanks for your help.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19794) who wrote the the documents?

2020-10-24 Thread Kenneth William Krugler (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17220212#comment-17220212
 ] 

Kenneth William Krugler commented on FLINK-19794:
-

Hi [~appleyuchi] - the right place to ask a question like this is on the [Flink 
user mailing list|https://flink.apache.org/community.html#mailing-lists]. But 
to give you the short answer, from the command line:

* git clone https://github.com/apache/flink.git
* cd flink
* find . -name "tableApi*"
* git blame ./docs/dev/table/tableApi.md

will tell you, for each line in the source Markdown file (tableApi.md, in this 
case) used to generate the online docs, who last edited it.

> who wrote the the documents?
> 
>
> Key: FLINK-19794
> URL: https://issues.apache.org/jira/browse/FLINK-19794
> Project: Flink
>  Issue Type: Test
>Reporter: appleyuchi
>Priority: Major
>
> I'm learning Flink Documents
> There are 
> so much missing information
> and so much typos
> *Could you tell me who wrote the the 
> [documents|https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tableApi.html]
>  are in the mailing list?*
> So I can post question to them directly,such as:
> the mail title "*how to register TableAggregateFunction*"
> in the mailing list.
> Thanks for your help.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19264) Identical jobs cannot be run concurrently

2020-09-16 Thread Kenneth William Krugler (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19264?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17197096#comment-17197096
 ] 

Kenneth William Krugler commented on FLINK-19264:
-

Hmm, since FLINK-17295 was a recent change, either I did have a bug in my async 
MiniCluster mods, or there are other bugs lurking (at least in 1.8/1.9 code 
base).

> Identical jobs cannot be run concurrently
> -
>
> Key: FLINK-19264
> URL: https://issues.apache.org/jira/browse/FLINK-19264
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.12.0
>Reporter: Aljoscha Krettek
>Priority: Blocker
>
> While working on FLINK-19123 I noticed that jobs often fail on 
> {{MiniCluster}} when multiple jobs are running at the same time. 
> I created a reproducer here: 
> https://github.com/aljoscha/flink/tree/flink-19123-fix-test-stream-env-alternative.
>  You can run {{MiniClusterConcurrencyITCase}} to see the problem in action. 
> Sometimes the test will succeed, sometimes it will fail with
> {code}
> java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>   at 
> org.apache.flink.test.example.MiniClusterConcurrencyITCase.submitConcurrently(MiniClusterConcurrencyITCase.java:60)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
>   at 
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
>   at 
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:220)
>   at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:53)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
>   at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
>   at 
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:107)
>   at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
>   at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
>   at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>   at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:229)
>   at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>   at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>   at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>   at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
>   at 
> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:926)
>   at akka.dispatch.OnComplete.internal(Future.scala:264)
>   

[jira] [Commented] (FLINK-19264) MiniCluster is flaky with concurrent job execution

2020-09-16 Thread Kenneth William Krugler (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19264?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17197023#comment-17197023
 ] 

Kenneth William Krugler commented on FLINK-19264:
-

I often ran into this same issue, and eventually had to bail on async job 
submission with the `MiniCluster`. But this was with an older version of Flink 
(1.8/1.9) where I'd hacked up the code to support async submission of jobs, so 
I thought it was my bug.

> MiniCluster is flaky with concurrent job execution
> --
>
> Key: FLINK-19264
> URL: https://issues.apache.org/jira/browse/FLINK-19264
> Project: Flink
>  Issue Type: Bug
>Reporter: Aljoscha Krettek
>Priority: Major
>
> While working on FLINK-19123 I noticed that jobs often fail on 
> {{MiniCluster}} when multiple jobs are running at the same time. 
> I created a reproducer here: 
> https://github.com/aljoscha/flink/tree/flink-19123-fix-test-stream-env-alternative.
>  You can run {{MiniClusterConcurrencyITCase}} to see the problem in action. 
> Sometimes the test will succeed, sometimes it will fail with
> {code}
> java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>   at 
> org.apache.flink.test.example.MiniClusterConcurrencyITCase.submitConcurrently(MiniClusterConcurrencyITCase.java:60)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
>   at 
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
>   at 
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:220)
>   at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:53)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
>   at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
>   at 
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:107)
>   at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
>   at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
>   at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>   at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:229)
>   at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>   at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>   at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>   at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
>   at 
> 

[jira] [Comment Edited] (FLINK-19264) MiniCluster is flaky with concurrent job execution

2020-09-16 Thread Kenneth William Krugler (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19264?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17197023#comment-17197023
 ] 

Kenneth William Krugler edited comment on FLINK-19264 at 9/16/20, 2:52 PM:
---

I often ran into this same issue, and eventually had to bail on async job 
submission with the MiniCluster. But this was with an older version of Flink 
(1.8/1.9) where I'd hacked up the code to support async submission of jobs, so 
I thought it was my bug.


was (Author: kkrugler):
I often ran into this same issue, and eventually had to bail on async job 
submission with the `MiniCluster`. But this was with an older version of Flink 
(1.8/1.9) where I'd hacked up the code to support async submission of jobs, so 
I thought it was my bug.

> MiniCluster is flaky with concurrent job execution
> --
>
> Key: FLINK-19264
> URL: https://issues.apache.org/jira/browse/FLINK-19264
> Project: Flink
>  Issue Type: Bug
>Reporter: Aljoscha Krettek
>Priority: Major
>
> While working on FLINK-19123 I noticed that jobs often fail on 
> {{MiniCluster}} when multiple jobs are running at the same time. 
> I created a reproducer here: 
> https://github.com/aljoscha/flink/tree/flink-19123-fix-test-stream-env-alternative.
>  You can run {{MiniClusterConcurrencyITCase}} to see the problem in action. 
> Sometimes the test will succeed, sometimes it will fail with
> {code}
> java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>   at 
> org.apache.flink.test.example.MiniClusterConcurrencyITCase.submitConcurrently(MiniClusterConcurrencyITCase.java:60)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
>   at 
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
>   at 
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:220)
>   at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:53)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
>   at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
>   at 
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:107)
>   at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
>   at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
>   at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>   at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:229)
>   at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>   at 
> 

[jira] [Commented] (FLINK-19069) finalizeOnMaster takes too much time and client timeouts

2020-09-03 Thread Kenneth William Krugler (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17190447#comment-17190447
 ] 

Kenneth William Krugler commented on FLINK-19069:
-

See also [https://kb.databricks.com/data/append-slow-with-spark-2.0.0.html]

> finalizeOnMaster takes too much time and client timeouts
> 
>
> Key: FLINK-19069
> URL: https://issues.apache.org/jira/browse/FLINK-19069
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0, 1.10.0, 1.11.0, 1.12.0
>Reporter: Jiayi Liao
>Priority: Critical
> Fix For: 1.12.0, 1.11.2, 1.10.3
>
>
> Currently we execute {{finalizeOnMaster}} in JM's main thread, which may 
> stuck the JM for a very long time and client timeouts eventually. 
> For example, we'd like to write data to HDFS  and commit files on JM, which 
> takes more than ten minutes to commit tens of thousands files.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19133) User provided kafka partitioners are not initialized correctly

2020-09-03 Thread Kenneth William Krugler (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17190358#comment-17190358
 ] 

Kenneth William Krugler commented on FLINK-19133:
-

Yes, we ran into this exact issue recently with 1.11, where only partition 0 
was receiving data. "Fixed" it by passing Optional.empty() for the partition, 
so it would use Kafka partitioning vs. the FlinkFixedPartitioner, but good to 
see we weren't imagining things.

> User provided kafka partitioners are not initialized correctly
> --
>
> Key: FLINK-19133
> URL: https://issues.apache.org/jira/browse/FLINK-19133
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.11.1
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Blocker
> Fix For: 1.12.0, 1.11.2
>
>
> Reported in the ML: 
> https://lists.apache.org/thread.html/r94275a7314d44154eb1ac16237906e0f097e8a9d8a5a937e8dcb5e85%40%3Cdev.flink.apache.org%3E
> If a user provides a partitioner in combination with SerializationSchema it 
> is not initialized correctly and has no access to the parallel instance index 
> or number of parallel instances.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18934) Idle stream does not advance watermark in connected stream

2020-08-28 Thread Kenneth William Krugler (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18934?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17186823#comment-17186823
 ] 

Kenneth William Krugler commented on FLINK-18934:
-

On the list, [~dwysakowicz] said:
{quote}Hi Kien,
I am afraid this is a valid bug. I am not 100% sure but the way I
understand the code the idleness mechanism applies to input channels,
which means e.g. when multiple parallell instances shuffle its results
to downstream operators.
In case of a two input operator, combining the watermark of two
different upstream operators happens inside of the operator itself.
There we do not have the idleness status. We do not have a status that a
whole upstream operator became idle. That's definitely a bug/limitation.
I'm also cc'ing Aljoscha who could maybe confirm my analysis.
Best,
Dawid{quote}

And [~aljoscha] added:
{quote}Yes, I'm afraid this analysis is correct. The StreamOperator, 
AbstractStreamOperator to be specific, computes the combined watermarks from 
both inputs here: 
https://github.com/apache/flink/blob/f0ed29c06d331892a06ee9bddea4173d6300835d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java#L573.
 The operator layer is not aware of idleness so it will never notice. The 
idleness only works on the level of inputs but is never forwarded to an 
operator itself.

To fix this we would have to also make operators aware of idleness such that 
they can take this into account when computing the combined output watermark.

Best,
Aljoscha{quote}


> Idle stream does not advance watermark in connected stream
> --
>
> Key: FLINK-18934
> URL: https://issues.apache.org/jira/browse/FLINK-18934
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.11.1
>Reporter: Truong Duc Kien
>Priority: Major
>
> Per Flink documents, when a stream is idle, it will allow watermarks of 
> downstream operator to advance. However, when I connect an active data stream 
> with an idle data stream, the output watermark of the CoProcessOperator does 
> not increase.
> Here's a small test that reproduces the problem.
> https://github.com/kien-truong/flink-idleness-testing



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-19087) ResultPartitionWriter should not expose subpartition but only subpartition-readers

2020-08-28 Thread Kenneth William Krugler (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kenneth William Krugler updated FLINK-19087:

Summary: ResultPartitionWriter should not expose subpartition but only 
subpartition-readers  (was: ReaultPartitionWriter should not expose 
subpartition but only subpartition-readers)

> ResultPartitionWriter should not expose subpartition but only 
> subpartition-readers
> --
>
> Key: FLINK-19087
> URL: https://issues.apache.org/jira/browse/FLINK-19087
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
> Fix For: 1.12.0
>
>
> The {{ResultPartitionWiter}} currently gives arbitrary access to the 
> sub-partitions.
> These subpartitions may not always exist directly, such as in a sort based 
> shuffle.
> Necessary is only the access to a reader over a sub-partition's data (the 
> ResultSubpartitionView).
> In the spirit of minimal scope of knowledge, the methods should be scoped to 
> return readers, not the more general subpartitions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19069) finalizeOnMaster takes too much time and client timeouts

2020-08-27 Thread Kenneth William Krugler (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17186231#comment-17186231
 ] 

Kenneth William Krugler commented on FLINK-19069:
-

I'd recently posted about a related issue to the dev mailing list, where I 
asked:

{quote}[...] the default behavior of Hadoop’s FileOutputCommitter (with 
algorithm == 1) is to put files in task-specific sub-dirs.

It’s depending on a post-completion “merge paths” action to be taken by what is 
(for Hadoop) the Application Master.

I assume that when running on a real cluster, the 
HadoopOutputFormat.finalizeGlobal() method’s call to commitJob() would do this, 
but it doesn’t seem to be happening when I run locally.

If I set the algorithm version to 2, then “merge paths” is handled by 
FileOutputCommitter immediately, and the HadoopOutputFormat code finds files in 
the expected location.

Wondering if Flink should always be using version 2 of the algorithm, as that’s 
more performant when there are a lot of results (which is why it was added).

{quote}

> finalizeOnMaster takes too much time and client timeouts
> 
>
> Key: FLINK-19069
> URL: https://issues.apache.org/jira/browse/FLINK-19069
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Affects Versions: 1.9.0
>Reporter: Jiayi Liao
>Priority: Major
>
> Currently we execute {{finalizeOnMaster}} in JM's main thread, which may 
> stuck the JM for a very long time and client timeouts eventually. 
> For example, we'd like to write data to HDFS  and commit files on JM, which 
> takes more than ten minutes to commit tens of thousands files.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18961) In the case of FlatMap linking map, if map returns null, an exception will be thrown in FlatMap

2020-08-14 Thread Kenneth William Krugler (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17178007#comment-17178007
 ] 

Kenneth William Krugler commented on FLINK-18961:
-

Hi [~chesnay] - the exception is clear, but I see this biting a lot of new 
Flink users. I think it would be worthwhile to add a warning to the 
[MapFunction|http://[https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/functions/MapFunction.html]]
 documentation, that says something about never returning a null value, and 
using a FlatmapFunction as an alternative is results could be negative.

>  In the case of FlatMap linking map, if map returns null, an exception will 
> be thrown in FlatMap
> 
>
> Key: FLINK-18961
> URL: https://issues.apache.org/jira/browse/FLINK-18961
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataSet
>Affects Versions: 1.11.0
> Environment: Mac OS 10.13.6
> Kubernetes 1.16.8
> Flink 1.11.0
>Reporter: Ryan
>Priority: Minor
> Attachments: Lark20200814-173817.png, Lark20200814-173821.png, 
> Lark20200814-173824.png
>
>
> I found a DateSet problem.  In the case of FlatMap linking map, if map 
> returns null, an exception will be thrown in FlatMap.I think it's a problem 
> with the operator chain.I will post a screenshot of the corresponding stack 
> call in the attachment.
> {code:java}
> text.filter(value -> value.f0.contains("any")).flatMap(new 
> FlatMapFunction, String>() {
>   @Override
>   public void flatMap(Tuple2 value, 
> Collector out) throws Exception {
>   Pattern pattern = Pattern.compile("\".*\"");
>   Matcher matcher = pattern.matcher(value.f0);
>   if(matcher.find()){
>   String match = matcher.group(0);
>   out.collect(match); // here throw Exception
>   }
>   }
> }).map(value -> {
> try {
> String jsonS = value.replace("\"\"","\"");
> jsonS = jsonS.substring(1,jsonS.length()-1);
> JSONObject json = JSONObject.parseObject(jsonS);
> String result = 
> json.getJSONObject("body").getJSONObject("message").getString("data");
> return result; // this is null 
> }catch (Exception e){
> return value;
> }
> }).print();
> Caused by: java.lang.NullPointerException: The system does not support 
> records that are null. Null values are only supported as fields inside other 
> objects.Caused by: java.lang.NullPointerException: The system does not 
> support records that are null. Null values are only supported as fields 
> inside other objects. at 
> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:76)
>  at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>  at 
> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79)
>  at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>  at com.lemonbox.Test$1.flatMap(Test.java:42) at 
> com.lemonbox.Test$1.flatMap(Test.java:35) at 
> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80)
>  at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>  at 
> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:58)
>  at 
> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80)
>  at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>  at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:196)
>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at 
> java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18934) Idle stream does not advance watermark in connected stream

2020-08-13 Thread Kenneth William Krugler (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18934?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17177070#comment-17177070
 ] 

Kenneth William Krugler commented on FLINK-18934:
-

Hi [~kien_truong] - I would suggest posting this question to the Flink user 
mailing list, and after reviewing the responses, decide whether this is really 
a bug.

> Idle stream does not advance watermark in connected stream
> --
>
> Key: FLINK-18934
> URL: https://issues.apache.org/jira/browse/FLINK-18934
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.11.1
>Reporter: Truong Duc Kien
>Priority: Major
>
> Per Flink documents, when a stream is idle, it will allow watermarks of 
> downstream operator to advance. However, when I connect an active data stream 
> with an idle data stream, the output watermark of the CoProcessOperator does 
> not increase.
> Here's a small test that reproduces the problem.
> https://github.com/kien-truong/flink-idleness-testing



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-18723) performance test

2020-07-26 Thread Kenneth William Krugler (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17165409#comment-17165409
 ] 

Kenneth William Krugler edited comment on FLINK-18723 at 7/27/20, 3:38 AM:
---

Hi [~dahaishuantuoba] - this is a question best asked via the [Flink mailing 
list|[https://flink.apache.org/community.html]], or on [Stack 
Overflow|[https://stackoverflow.com/questions/tagged/apache-flink]]. If would 
be great if you could ask on one of those two venues, and close this issue, 
thanks!


was (Author: kkrugler):
Hi [~dahaishuantuoba] - this is a question best asked via the [Flink mailing 
list|[https://flink.apache.org/community.html]], or on [Stack 
Overflow|[https://stackoverflow.com/questions/tagged/apache-flink]].

> performance test
> 
>
> Key: FLINK-18723
> URL: https://issues.apache.org/jira/browse/FLINK-18723
> Project: Flink
>  Issue Type: Test
>Reporter: junbiao chen
>Priority: Major
>
> what is the best performance benchmark for flink



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18723) performance test

2020-07-26 Thread Kenneth William Krugler (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17165409#comment-17165409
 ] 

Kenneth William Krugler commented on FLINK-18723:
-

Hi [~dahaishuantuoba] - this is a question best asked via the [Flink mailing 
list|[https://flink.apache.org/community.html]], or on [Stack 
Overflow|[https://stackoverflow.com/questions/tagged/apache-flink]].

> performance test
> 
>
> Key: FLINK-18723
> URL: https://issues.apache.org/jira/browse/FLINK-18723
> Project: Flink
>  Issue Type: Test
>Reporter: junbiao chen
>Priority: Major
>
> what is the best performance benchmark for flink



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18707) Calling remove() for RockDB ListState iterator does nothing

2020-07-24 Thread Kenneth William Krugler (Jira)
Kenneth William Krugler created FLINK-18707:
---

 Summary: Calling remove() for RockDB ListState iterator does 
nothing 
 Key: FLINK-18707
 URL: https://issues.apache.org/jira/browse/FLINK-18707
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Affects Versions: 1.9.2
Reporter: Kenneth William Krugler


If you use the FsStateBackend (or MemoryStateBackend), and you have ListState, 
then you can get an iterator and remove() an entry, and it all works as 
expected.


If you use the RocksDBStateBackend, the remove() call doesn’t throw an 
exception, but the ListState isn’t updated.

As [~aljoscha] noted:
{quote}We cannot efficiently implement it [remove] for RocksDB
{quote}
So an exception should be thrown when calling remove() call.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18436) how to implement the class `MyTupleReducer`in flink official document

2020-07-01 Thread Kenneth William Krugler (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17149465#comment-17149465
 ] 

Kenneth William Krugler commented on FLINK-18436:
-

Hi [~appleyuchi] - I'm going to suggest that you close this issue, make the 
changes that David Anderson suggested on Stack Overflow, and if you still have 
issues then please post to the Flink user mailing list. Just FYI, Jira issues 
are for bugs/improvements, not questions about Flink, thanks!

> how to implement the class `MyTupleReducer`in flink official document
> -
>
> Key: FLINK-18436
> URL: https://issues.apache.org/jira/browse/FLINK-18436
> Project: Flink
>  Issue Type: Test
>Reporter: appleyuchi
>Priority: Minor
>
> This question has been posted in 
> [https://stackoverflow.com/questions/62553572/how-to-implement-the-class-mytuplereducerin-flink-official-document]
>  
> but no final result:
>  
>  
> I'm learning [flink document-dataset api 
> |https://ci.apache.org/projects/flink/flink-docs-stable/dev/batch/dataset_transformations.html]
> there's a class called{{mytupleReducer}}
> I'm trying to complete it: [https://paste.ubuntu.com/p/3CjphGQrXP/]
> but it' full of red line in Intellij.
> could you give me a right style of above code?
> Thanks for your help~!
> PS:
> I'm writing part of MyTupleReduce [https://pastebin.ubuntu.com/p/m4rjs6t8QP/]
> but the return part is Wrong.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17478) Avro format logical type conversions do not work due to type mismatch

2020-05-06 Thread Kenneth William Krugler (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17101239#comment-17101239
 ] 

Kenneth William Krugler commented on FLINK-17478:
-

Hi [~gyfora] - is this related to FLINK-17486? 

> Avro format logical type conversions do not work due to type mismatch
> -
>
> Key: FLINK-17478
> URL: https://issues.apache.org/jira/browse/FLINK-17478
> Project: Flink
>  Issue Type: Sub-task
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table 
> SQL / Planner
>Affects Versions: 1.10.0
>Reporter: Gyula Fora
>Priority: Major
>
> We hit the following issue when trying to use avro logical timestamp types:
>  
> {code:java}
> CREATE TABLE source_table (
>  int_field INT,
>  timestamp_field TIMESTAMP(3)
> ) WITH (
>  'connector.type' = 'kafka',
>  'connector.version' = 'universal',
>  'connector.topic' = 'avro_tset',
>  'connector.properties.bootstrap.servers' = '<...>',
>  'format.type' = 'avro',
>  'format.avro-schema' =
>  '{
>  "type": "record",
>  "name": "test",
>  "fields" : [
>  {"name": "int_field", "type": "int"},
>  {"name": "timestamp_field", "type": {"type":"long", "logicalType": 
> "timestamp-millis"}}
>  ]
>  }'
> ) 
>  
> INSERT INTO source_table VALUES (12, TIMESTAMP '1999-11-11 11:11:11'); 
> {code}
>  
> And the error: 
> {noformat}
> Caused by: java.lang.ClassCastException: java.time.LocalDateTime cannot be 
> cast to java.lang.Long at 
> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131)
>  at 
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72) 
> at 
> org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:166)
>  at 
> org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:90)
>  at 
> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
>  at 
> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
>  at 
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75) 
> at 
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62) 
> at 
> org.apache.flink.formats.avro.AvroRowSerializationSchema.serialize(AvroRowSerializationSchema.java:143){noformat}
> Dawid's analysis from the ML discussion:
> It seems that the information about the bridging class (java.sql.Timestamp in 
> this case) is lost in the stack. Because this information is lost/not 
> respected the planner produces LocalDateTime instead of a proper 
> java.sql.Timestamp time. The AvroRowSerializationSchema expects 
> java.sql.Timestamp for a column of TIMESTAMP type and thus it fails for 
> LocalDateTime. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16157) StateFun benchmark and performance

2020-02-18 Thread Kenneth William Krugler (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17039409#comment-17039409
 ] 

Kenneth William Krugler commented on FLINK-16157:
-

Hi [~omidbb] - this would be a good question for the [Flink user's 
list|[https://flink.apache.org/community.html#mailing-lists]] (instead of a 
Jira issue), thanks!

> StateFun benchmark and performance
> --
>
> Key: FLINK-16157
> URL: https://issues.apache.org/jira/browse/FLINK-16157
> Project: Flink
>  Issue Type: Wish
>  Components: Stateful Functions
>Reporter: Omid B
>Priority: Major
>
> Hi,
> Is there any project or attempt to see the performance of the StateFun 
> functions. Throughput, the state change latency, etc ...
> Thanks



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15634) disableAutoGeneratedUIDs fails with coGroup and join

2020-01-18 Thread Kenneth William Krugler (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018636#comment-17018636
 ] 

Kenneth William Krugler commented on FLINK-15634:
-

Is this a duplicate of https://issues.apache.org/jira/browse/FLINK-14910?

> disableAutoGeneratedUIDs fails with coGroup and join
> 
>
> Key: FLINK-15634
> URL: https://issues.apache.org/jira/browse/FLINK-15634
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.10.0
>Reporter: Jürgen Kreileder
>Priority: Major
>
> coGroup/join seems to generate two Map operators for which you can't set the 
> UID. 
> Here's a test case:
> {code:java}
> @Test
> public void testDisablingAutoUidsWorksWithCoGroup() throws Exception {
>StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>env.getConfig().disableAutoGeneratedUIDs();
>env
>   .addSource(new 
> NoOpSourceFunction()).setUidHash("")
>   .coGroup(env.addSource(new 
> NoOpSourceFunction()).setUidHash(""))
>   .where(o -> o).equalTo(o -> o)
>   .window(TumblingEventTimeWindows.of(Time.days(1)))
>   .with(new CoGroupFunction() {
>  @Override
>  public void coGroup(Iterable first, Iterable second, 
> Collector out) throws Exception {
>  }
>   }).setUidHash("")
>   .addSink(new 
> DiscardingSink<>()).setUidHash("");
>env.execute();
> }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14798) how to setup a ha flink cluster on k8s?

2019-11-15 Thread Kenneth William Krugler (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16975258#comment-16975258
 ] 

Kenneth William Krugler commented on FLINK-14798:
-

Hi [~lonely1eaf] - for questions like this, please post to the [Flink user 
mailing list|https://flink.apache.org/gettinghelp.html], thanks!


> how to setup a ha flink cluster on k8s?
> ---
>
> Key: FLINK-14798
> URL: https://issues.apache.org/jira/browse/FLINK-14798
> Project: Flink
>  Issue Type: New Feature
>  Components: Deployment / Kubernetes
>Affects Versions: 1.9.1
>Reporter: rock
>Priority: Major
>
> I'm trying to setup a flink cluster on k8s for production use.But the setup 
> [here 
> |[[https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/deployment/kubernetes.html|https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/deployment/kubernetes.html]
>  
> []|https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/deployment/kubernetes.html]
>  this not ha , when job-manager down and rescheduled
> the metadata for running job is lost. 
>  
> I tried to use ha setup for zk [here 
> |[https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/jobmanager_high_availability.html]]
>  on k8s , but can't get it right.
>  
> Stroing  job's metadata on k8s using pvc or other external file system should 
> be  very easy.Is there a way to achieve it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14472) Implement back-pressure monitor with non-blocking outputs

2019-10-21 Thread Kenneth William Krugler (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16956148#comment-16956148
 ] 

Kenneth William Krugler commented on FLINK-14472:
-

Did the issue ever get fixed, where asynchronous I/O back-pressure wasn't being 
reported? This was also due to how detection of back-pressure was tied to where 
threads were blocked. From [~trohrmann]'s email on March 25th, 2019:

{quote}I think Seed is correct that we don't properly report backpressure from 
an AsyncWaitOperator. The problem is that not the Task's main execution thread 
but the Emitter thread will emit the elements and, thus, be stuck in the 
`requestBufferBuilderBlocking` method.
{quote}

If that's still an outstanding issue, I'm wondering if the above change would 
also fix it.

> Implement back-pressure monitor with non-blocking outputs
> -
>
> Key: FLINK-14472
> URL: https://issues.apache.org/jira/browse/FLINK-14472
> Project: Flink
>  Issue Type: Task
>  Components: Runtime / Network
>Reporter: zhijiang
>Assignee: Yingjie Cao
>Priority: Minor
> Fix For: 1.10.0
>
>
> Currently back-pressure monitor relies on detecting task threads that are 
> stuck in `requestBufferBuilderBlocking`. There are actually two cases to 
> cause back-pressure ATM:
>  * There are no available buffers in `LocalBufferPool` and all the given 
> quotas from global pool are also exhausted. Then we need to wait for buffer 
> recycling to `LocalBufferPool`.
>  * No available buffers in `LocalBufferPool`, but the quota has not been used 
> up. While requesting buffer from global pool, it is blocked because of no 
> available buffers in global pool. Then we need to wait for buffer recycling 
> to global pool.
> We already implemented the non-blocking output for the first case in 
> [FLINK-14396|https://issues.apache.org/jira/browse/FLINK-14396], and we 
> expect the second case done together with adjusting the back-pressure monitor 
> which could check for `RecordWriter#isAvailable` instead.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14197) Increasing trend for state size of keyed stream using ProcessWindowFunction with ProcessingTimeSessionWindows

2019-09-26 Thread Kenneth William Krugler (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14197?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16938961#comment-16938961
 ] 

Kenneth William Krugler commented on FLINK-14197:
-

I would suggest posting to the mailing list, as you'll (a) get input from the 
larger Flink community, and (b) the discussion is more visible and thus useful 
to other users.

> Increasing trend for state size of keyed stream using ProcessWindowFunction 
> with ProcessingTimeSessionWindows
> -
>
> Key: FLINK-14197
> URL: https://issues.apache.org/jira/browse/FLINK-14197
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.9.0
> Environment: Tested with:
>  * Local Flink Mini Cluster running from IDE
>  * Flink standalone cluster run in docker
>Reporter: Oliver Kostera
>Priority: Major
>
> I'm using *ProcessWindowFunction* in a keyed stream with the following 
> definition:
> {code:java}
> final SingleOutputStreamOperator processWindowFunctionStream 
> =
> 
> keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.milliseconds(100)))
> .process(new 
> CustomProcessWindowFunction()).uid(PROCESS_WINDOW_FUNCTION_OPERATOR_ID)
> .name("Process window function");
> {code}
> My checkpointing configuration is set to use RocksDB state backend with 
> incremental checkpointing and EXACTLY_ONCE mode.
> In a runtime I noticed that even though data ingestion is static - same keys 
> and frequency of messages the size of the process window operator keeps 
> increasing. I tried to reproduce it with minimal similar setup here: 
> https://github.com/loliver1234/flink-process-window-function and was 
> successful to do so.
> Testing conditions:
> - RabbitMQ source with Exactly-once guarantee and 65k prefetch count
> - RabbitMQ sink to collect messages
> - Simple ProcessWindowFunction that only pass messages through
> - Stream time characteristic set to TimeCharacteristic.ProcessingTime
> Testing scenario:
> - Start flink job and check initial state size - State Size: 127 KB
> - Start sending messages, 1000 same unique keys every 1s (they are not 
> falling into defined time window gap set to 100ms, each message should create 
> new window)
> - State of the process window operator keeps increasing - after 1mln messages 
> state ended up to be around 2mb
> - Stop sending messages and wait till rabbit queue is fully consumed and few 
> checkpoints go by
> - Was expected to see state size to decrease to base value but it stayed at 
> 2mb
> - Continue to send messages with the same keys and state kept increasing 
> trend.
> What I checked:
> - Registration and deregistration of timestamps set for time windows - each 
> registration matched its deregistration
> - Checked that in fact there are no window merges
> - Tried custom Trigger disabling window merges and setting onProcessingTime 
> trigger to TriggerResult.FIRE_AND_PURGE - same state behavior
> On staging environment, we noticed that state for that operator keeps 
> increasing indefinitely, after some months reaching even 1,5gb for 100k 
> unique keys
> Flink commit id: 9c32ed9
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)