/cc Timo On 5/11/2021 1:25 PM, Chesnay Schepler wrote:
I'm not so sure myself, so I'm pulling in Timo to help out.I would think that we shouldn't create copies of EMPTY_ROW because it is a nice optimization. I think the deeper issue is that, while all other mutating values check for the arity (implicitly preventing modifications to EMPTY_ROW), setRowKind() does not. So maybe this case just wasn't considered. Conceptually it doesn't make sense to me to allow modifications of the EMPTY_ROW RowKind.On 5/11/2021 1:09 PM, Smile wrote:Hi Chesnay Schepler, Thank you for your reply. I found the problem just now.My code will modify the key got from KeySelector by updating its RowKind.Some key selectors such as BinaryRowDataKeySelector returns a copy of a key[1], but EmptyRowDataKeySelector always returns the same object[2].The test case AggregateITCase.testGroupBySingleValue with SQL Query "SELECT * FROM T2 WHERE T2.a < (SELECT count(*) * 0.3 FROM T1)" is indeed a global join without a key, thus when I perform mykey.setRowKind(RowKind.DELETE),the object of BinaryRowDataUtil.EMPTY_ROW changed, and all those records with an empty key got the wrong key. Should EmptyRowDataKeySelector also returns a copy of BinaryRowDataUtil.EMPTY_ROW? Otherwise, the key should never be changed because it may also be used by other records. Smile [1].https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/keyselector/BinaryRowDataKeySelector.java#L49[2].https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/keyselector/EmptyRowDataKeySelector.java#L36Chesnay Schepler wroteThis is a bit concerning. Could you re-run your test with enabled assertions and/or modify BinaryRowData#assertIndexIsValid to always throw an error if one of the 2 assertions is not met? On 5/11/2021 9:37 AM, Smile wrote:Hi all, I'm trying to add mini-batch optimizations for Regular Join(flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperator.java)in Blink planner. And there're some test cases that failed, such as AggregateITCase.testGroupBySingleValue. After debugging, I found the corresponding heap memory for BinaryRowDataUtil.EMPTY_ROW was changed unexpectedly, from [0,0,0,0,0,0,0,0]to [3,0,0,0,0,0,0,0], and lead to some records being set to a wrong key.However, my mini-batch code doesn't have any low-level operators withMemorySegment. I only buffered some records (RowData) in a Map just likeAbstractMapBundleOperator did. Object reuse was also disabled byenv.getConfig.disableObjectReuse(). It looks like there's something wrongwhen StreamOneInputProcessor.processInput changed the memory segments thatdo not belong to it (belong to BinaryRowDataUtil.EMPTY_ROW instead). Thedebugging page with more information was attached. I'm not familiar with org.apache.flink.core.memory.MemorySegment or sun.misc.Unsafe, so I'd like to ask maillist for help. Do you have any ideas about why it happens or where to check next? Thank you. Smile<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2787/EmptyRowDebug-20210511-1.png>-- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/--Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/