Re: [PR] Add LRU cache eviction to CombinePerKeyPrecombineOperator [beam]
damccorm merged PR #37466: URL: https://github.com/apache/beam/pull/37466 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Add LRU cache eviction to CombinePerKeyPrecombineOperator [beam]
junaiddshaukat commented on PR #37466: URL: https://github.com/apache/beam/pull/37466#issuecomment-3837444343 @damccorm, I have addressed all review feedback: * Replaced `flush()` with `flushLRU(0)` and removed the duplicate method * Removed code duplication between `flush` and `flushLRU` * Inlined `touchKey` logic and optimized it to call `delete()` only when the key exists Let me know if further changes needed! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Add LRU cache eviction to CombinePerKeyPrecombineOperator [beam]
junaiddshaukat commented on code in PR #37466:
URL: https://github.com/apache/beam/pull/37466#discussion_r2756112669
##
sdks/typescript/src/apache_beam/worker/operators.ts:
##
@@ -495,6 +509,45 @@ export class CombinePerKeyPrecombineOperator
this.groups = new Map();
}
+ /**
+ * Flushes entries from the cache using LRU eviction.
+ * Evicts the least recently used entries (from the front of the Map)
+ * until the cache size is at or below the target.
+ */
+ flushLRU(target: number): ProcessResult {
+const result = new ProcessResultBuilder();
+const toDelete: string[] = [];
+// Iterate from the front (oldest/least recently used entries)
+for (const [wkey, values] of this.groups) {
+ if (this.groups.size - toDelete.length <= target) {
+break;
+ }
+ const parts = wkey.split(" ");
+ const encodedWindow = parts[0];
+ const encodedKey = parts[1];
+ const window = decodeFromBase64(encodedWindow, this.windowCoder);
+ result.add(
+this.receiver.receive({
+ value: {
+key: decodeFromBase64(encodedKey, this.keyCoder),
+value: values,
+ },
+ windows: [window],
+ timestamp: window.maxTimestamp(),
+ pane: PaneInfoCoder.ONE_AND_ONLY_FIRING,
+}),
+ );
+ toDelete.push(wkey);
+}
+for (const wkey of toDelete) {
+ this.groups.delete(wkey);
+}
+return result.build();
+ }
Review Comment:
Great suggestion! You're absolutely right, since flush is only called with
target=0 (in finishBundle), and flushLRU(0) would achieve the same result of
flushing all entries, we can simply replace flush(target) with flushLRU(0). Let
me fix that!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Add LRU cache eviction to CombinePerKeyPrecombineOperator [beam]
damccorm commented on code in PR #37466:
URL: https://github.com/apache/beam/pull/37466#discussion_r2756100027
##
sdks/typescript/src/apache_beam/worker/operators.ts:
##
@@ -495,6 +509,45 @@ export class CombinePerKeyPrecombineOperator
this.groups = new Map();
}
+ /**
+ * Flushes entries from the cache using LRU eviction.
+ * Evicts the least recently used entries (from the front of the Map)
+ * until the cache size is at or below the target.
+ */
+ flushLRU(target: number): ProcessResult {
+const result = new ProcessResultBuilder();
+const toDelete: string[] = [];
+// Iterate from the front (oldest/least recently used entries)
+for (const [wkey, values] of this.groups) {
+ if (this.groups.size - toDelete.length <= target) {
+break;
+ }
+ const parts = wkey.split(" ");
+ const encodedWindow = parts[0];
+ const encodedKey = parts[1];
+ const window = decodeFromBase64(encodedWindow, this.windowCoder);
+ result.add(
+this.receiver.receive({
+ value: {
+key: decodeFromBase64(encodedKey, this.keyCoder),
+value: values,
+ },
+ windows: [window],
+ timestamp: window.maxTimestamp(),
+ pane: PaneInfoCoder.ONE_AND_ONLY_FIRING,
+}),
+ );
+ toDelete.push(wkey);
+}
+for (const wkey of toDelete) {
+ this.groups.delete(wkey);
+}
+return result.build();
+ }
Review Comment:
Do we even need to keep the existing flush implementation? We could probably
replace it with a `clear` operation that is a simpler equivalent of `flush(0)`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Add LRU cache eviction to CombinePerKeyPrecombineOperator [beam]
gemini-code-assist[bot] commented on code in PR #37466:
URL: https://github.com/apache/beam/pull/37466#discussion_r2756092971
##
sdks/typescript/src/apache_beam/worker/operators.ts:
##
@@ -495,6 +509,45 @@ export class CombinePerKeyPrecombineOperator
this.groups = new Map();
}
+ /**
+ * Flushes entries from the cache using LRU eviction.
+ * Evicts the least recently used entries (from the front of the Map)
+ * until the cache size is at or below the target.
+ */
+ flushLRU(target: number): ProcessResult {
+const result = new ProcessResultBuilder();
+const toDelete: string[] = [];
+// Iterate from the front (oldest/least recently used entries)
+for (const [wkey, values] of this.groups) {
+ if (this.groups.size - toDelete.length <= target) {
+break;
+ }
+ const parts = wkey.split(" ");
+ const encodedWindow = parts[0];
+ const encodedKey = parts[1];
+ const window = decodeFromBase64(encodedWindow, this.windowCoder);
+ result.add(
+this.receiver.receive({
+ value: {
+key: decodeFromBase64(encodedKey, this.keyCoder),
+value: values,
+ },
+ windows: [window],
+ timestamp: window.maxTimestamp(),
+ pane: PaneInfoCoder.ONE_AND_ONLY_FIRING,
+}),
+ );
+ toDelete.push(wkey);
+}
+for (const wkey of toDelete) {
+ this.groups.delete(wkey);
+}
+return result.build();
+ }
Review Comment:

There is significant code duplication between `flushLRU` and the `flush`
method. The logic for iterating, decoding keys, sending results, and collecting
keys for deletion is nearly identical. While the loop termination condition is
checked at a different point, the core processing logic is the same. To improve
maintainability and reduce redundancy, this common logic should be extracted
into a private helper method that both `flushLRU` and `flush` can call.
##
sdks/typescript/src/apache_beam/worker/operators.ts:
##
@@ -468,24 +470,36 @@ export class CombinePerKeyPrecombineOperator
);
}
+ /**
+ * Moves a key to the end of the Map (most recently used position).
+ * JavaScript Maps preserve insertion order, so delete + set moves to end.
+ */
+ private touchKey(wkey: string, value: A) {
+this.groups.delete(wkey);
+this.groups.set(wkey, value);
+ }
+
process(wvalue: WindowedValue) {
for (const window of wvalue.windows) {
const wkey =
encodeToBase64(window, this.windowCoder) +
" " +
encodeToBase64(wvalue.value.key, this.keyCoder);
- if (!this.groups.has(wkey)) {
-this.groups.set(wkey, this.combineFn.createAccumulator());
- }
- this.groups.set(
-wkey,
-this.combineFn.addInput(this.groups.get(wkey), wvalue.value.value),
- );
+ const existingAccumulator = this.groups.get(wkey);
+ const newAccumulator =
+existingAccumulator !== undefined
+ ? this.combineFn.addInput(existingAccumulator, wvalue.value.value)
+ : this.combineFn.addInput(
+ this.combineFn.createAccumulator(),
+ wvalue.value.value,
+);
+ // Move to end (most recently used) by delete + set
+ this.touchKey(wkey, newAccumulator);
Review Comment:

The logic for updating the accumulator and managing the LRU behavior can be
simplified and made slightly more performant. The current approach uses a
`touchKey` helper method which is called for every element. This results in a
`delete` operation even for new keys, which is unnecessary. By inlining this
logic into the `process` method, we can avoid the helper and the extra
operation for new keys, making the code more concise and efficient. This change
would also make the `touchKey` method (lines 477-480) obsolete, allowing for
its removal.
```typescript
const accumulator = this.groups.get(wkey);
if (accumulator !== undefined) {
this.groups.delete(wkey);
}
this.groups.set(
wkey,
this.combineFn.addInput(
accumulator ?? this.combineFn.createAccumulator(),
wvalue.value.value
)
);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Add LRU cache eviction to CombinePerKeyPrecombineOperator [beam]
damccorm commented on PR #37466: URL: https://github.com/apache/beam/pull/37466#issuecomment-3837403087 /gemini review -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Add LRU cache eviction to CombinePerKeyPrecombineOperator [beam]
github-actions[bot] commented on PR #37466: URL: https://github.com/apache/beam/pull/37466#issuecomment-3828896154 Assigning reviewers: R: @Abacn added as fallback since no labels match configuration Note: If you would like to opt out of this review, comment `assign to next reviewer`. Available commands: - `stop reviewer notifications` - opt out of the automated review tooling - `remind me after tests pass` - tag the comment author after tests pass - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers) The PR bot will only process comments in the main thread (not review comments). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Add LRU cache eviction to CombinePerKeyPrecombineOperator [beam]
junaiddshaukat commented on PR #37466: URL: https://github.com/apache/beam/pull/37466#issuecomment-3828855359 Hi @damccorm, added LRU to PrecombineOperator. Please have a look when you got time, Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Add LRU cache eviction to CombinePerKeyPrecombineOperator [beam]
gemini-code-assist[bot] commented on PR #37466: URL: https://github.com/apache/beam/pull/37466#issuecomment-3828820802 ## Summary of Changes Hello @junaiddshaukat, I'm Gemini Code Assist[^1]! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the `CombinePerKeyPrecombineOperator` by integrating a robust LRU cache eviction mechanism. This change addresses a long-standing TODO, improving the operator's memory management and performance by intelligently removing less frequently accessed data when the cache capacity is reached. The implementation leverages native JavaScript Map behavior to maintain the LRU order efficiently. ### Highlights * **LRU Cache Eviction**: Implemented Least Recently Used (LRU) cache eviction for the `CombinePerKeyPrecombineOperator` to efficiently manage its internal state when the number of keys exceeds a defined limit. * **New `touchKey()` Method**: Introduced a `touchKey()` private method that moves an accessed key to the end of the internal Map, effectively marking it as most recently used by leveraging JavaScript Map's insertion order preservation. * **New `flushLRU()` Method**: Added a `flushLRU()` method responsible for evicting the least recently used entries from the cache (those at the front of the Map) until the cache size is within the specified limit. * **Updated `process()` Logic**: The `process()` method was updated to utilize the new `touchKey()` method for every key access, ensuring that the LRU order is maintained. It also now calls `flushLRU()` instead of the previous random flush logic when the cache size limit is exceeded. 🧠**New Feature in Public Preview:** You can now enable **Memory** to help **Gemini Code Assist** learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. **Click [here](https://codeassist.google/code-review/login) to enable Memory in your admin console.** Using Gemini Code Assist The full guide for Gemini Code Assist can be found on our [documentation page](https://developers.google.com/gemini-code-assist/docs/review-github-code), here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either `/gemini ` or `@gemini-code-assist `. Below is a summary of the supported commands on the current page. Feature | Command | Description --- | --- | --- Code Review | `/gemini review` | Performs a code review for the current pull request in its current state. Pull Request Summary | `/gemini summary` | Provides a summary of the current pull request in its current state. Comment | @gemini-code-assist | Responds in comments when explicitly tagged, both in pull request comments and review comments. Help | `/gemini help` | Displays a list of available commands. Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a `.gemini/` folder in the base of the repository. Detailed instructions can be found [here](https://developers.google.com/gemini-code-assist/docs/customize-gemini-behavior-github). Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with :thumbsup: and :thumbsdown: on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up [here](https://google.qualtrics.com/jfe/form/SV_2cyuGuTWsEw84yG). You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the [Gemini Code Assist IDE Extension](https://cloud.google.com/products/gemini/code-assist). [^1]: Review the [Privacy Notices](https://policies.google.com/privacy), [Generative AI Prohibited Use Policy](https://policies.google.com/terms/generative-ai/use-policy), [Terms of Service](https://policies.google.com/terms), and learn how to configure Gemini Code Assist in GitHub [here](https://developers.google.com/gemini-code-assist/docs/customize-gemini-behavior-github). Gemini can make mistakes, so double check it and [use code with caution](https://support.google.com/legal/answer/13505487). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this servic
