Re: [PR] Add LRU cache eviction to CombinePerKeyPrecombineOperator [beam]

2026-02-02 Thread via GitHub


damccorm merged PR #37466:
URL: https://github.com/apache/beam/pull/37466


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Add LRU cache eviction to CombinePerKeyPrecombineOperator [beam]

2026-02-02 Thread via GitHub


junaiddshaukat commented on PR #37466:
URL: https://github.com/apache/beam/pull/37466#issuecomment-3837444343

   @damccorm, I have addressed all review feedback:
   * Replaced `flush()` with `flushLRU(0)` and removed the duplicate method
   * Removed code duplication between `flush` and `flushLRU`
   * Inlined `touchKey` logic and optimized it to call `delete()` only when the 
key exists
   
   Let me know if further changes needed!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Add LRU cache eviction to CombinePerKeyPrecombineOperator [beam]

2026-02-02 Thread via GitHub


junaiddshaukat commented on code in PR #37466:
URL: https://github.com/apache/beam/pull/37466#discussion_r2756112669


##
sdks/typescript/src/apache_beam/worker/operators.ts:
##
@@ -495,6 +509,45 @@ export class CombinePerKeyPrecombineOperator
 this.groups = new Map();
   }
 
+  /**
+   * Flushes entries from the cache using LRU eviction.
+   * Evicts the least recently used entries (from the front of the Map)
+   * until the cache size is at or below the target.
+   */
+  flushLRU(target: number): ProcessResult {
+const result = new ProcessResultBuilder();
+const toDelete: string[] = [];
+// Iterate from the front (oldest/least recently used entries)
+for (const [wkey, values] of this.groups) {
+  if (this.groups.size - toDelete.length <= target) {
+break;
+  }
+  const parts = wkey.split(" ");
+  const encodedWindow = parts[0];
+  const encodedKey = parts[1];
+  const window = decodeFromBase64(encodedWindow, this.windowCoder);
+  result.add(
+this.receiver.receive({
+  value: {
+key: decodeFromBase64(encodedKey, this.keyCoder),
+value: values,
+  },
+  windows: [window],
+  timestamp: window.maxTimestamp(),
+  pane: PaneInfoCoder.ONE_AND_ONLY_FIRING,
+}),
+  );
+  toDelete.push(wkey);
+}
+for (const wkey of toDelete) {
+  this.groups.delete(wkey);
+}
+return result.build();
+  }

Review Comment:
   Great suggestion! You're absolutely right, since flush is only called with 
target=0 (in finishBundle), and flushLRU(0) would achieve the same result of 
flushing all entries, we can simply replace flush(target) with flushLRU(0). Let 
me fix that!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Add LRU cache eviction to CombinePerKeyPrecombineOperator [beam]

2026-02-02 Thread via GitHub


damccorm commented on code in PR #37466:
URL: https://github.com/apache/beam/pull/37466#discussion_r2756100027


##
sdks/typescript/src/apache_beam/worker/operators.ts:
##
@@ -495,6 +509,45 @@ export class CombinePerKeyPrecombineOperator
 this.groups = new Map();
   }
 
+  /**
+   * Flushes entries from the cache using LRU eviction.
+   * Evicts the least recently used entries (from the front of the Map)
+   * until the cache size is at or below the target.
+   */
+  flushLRU(target: number): ProcessResult {
+const result = new ProcessResultBuilder();
+const toDelete: string[] = [];
+// Iterate from the front (oldest/least recently used entries)
+for (const [wkey, values] of this.groups) {
+  if (this.groups.size - toDelete.length <= target) {
+break;
+  }
+  const parts = wkey.split(" ");
+  const encodedWindow = parts[0];
+  const encodedKey = parts[1];
+  const window = decodeFromBase64(encodedWindow, this.windowCoder);
+  result.add(
+this.receiver.receive({
+  value: {
+key: decodeFromBase64(encodedKey, this.keyCoder),
+value: values,
+  },
+  windows: [window],
+  timestamp: window.maxTimestamp(),
+  pane: PaneInfoCoder.ONE_AND_ONLY_FIRING,
+}),
+  );
+  toDelete.push(wkey);
+}
+for (const wkey of toDelete) {
+  this.groups.delete(wkey);
+}
+return result.build();
+  }

Review Comment:
   Do we even need to keep the existing flush implementation? We could probably 
replace it with a `clear` operation that is a simpler equivalent of `flush(0)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Add LRU cache eviction to CombinePerKeyPrecombineOperator [beam]

2026-02-02 Thread via GitHub


gemini-code-assist[bot] commented on code in PR #37466:
URL: https://github.com/apache/beam/pull/37466#discussion_r2756092971


##
sdks/typescript/src/apache_beam/worker/operators.ts:
##
@@ -495,6 +509,45 @@ export class CombinePerKeyPrecombineOperator
 this.groups = new Map();
   }
 
+  /**
+   * Flushes entries from the cache using LRU eviction.
+   * Evicts the least recently used entries (from the front of the Map)
+   * until the cache size is at or below the target.
+   */
+  flushLRU(target: number): ProcessResult {
+const result = new ProcessResultBuilder();
+const toDelete: string[] = [];
+// Iterate from the front (oldest/least recently used entries)
+for (const [wkey, values] of this.groups) {
+  if (this.groups.size - toDelete.length <= target) {
+break;
+  }
+  const parts = wkey.split(" ");
+  const encodedWindow = parts[0];
+  const encodedKey = parts[1];
+  const window = decodeFromBase64(encodedWindow, this.windowCoder);
+  result.add(
+this.receiver.receive({
+  value: {
+key: decodeFromBase64(encodedKey, this.keyCoder),
+value: values,
+  },
+  windows: [window],
+  timestamp: window.maxTimestamp(),
+  pane: PaneInfoCoder.ONE_AND_ONLY_FIRING,
+}),
+  );
+  toDelete.push(wkey);
+}
+for (const wkey of toDelete) {
+  this.groups.delete(wkey);
+}
+return result.build();
+  }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   There is significant code duplication between `flushLRU` and the `flush` 
method. The logic for iterating, decoding keys, sending results, and collecting 
keys for deletion is nearly identical. While the loop termination condition is 
checked at a different point, the core processing logic is the same. To improve 
maintainability and reduce redundancy, this common logic should be extracted 
into a private helper method that both `flushLRU` and `flush` can call.



##
sdks/typescript/src/apache_beam/worker/operators.ts:
##
@@ -468,24 +470,36 @@ export class CombinePerKeyPrecombineOperator
 );
   }
 
+  /**
+   * Moves a key to the end of the Map (most recently used position).
+   * JavaScript Maps preserve insertion order, so delete + set moves to end.
+   */
+  private touchKey(wkey: string, value: A) {
+this.groups.delete(wkey);
+this.groups.set(wkey, value);
+  }
+
   process(wvalue: WindowedValue) {
 for (const window of wvalue.windows) {
   const wkey =
 encodeToBase64(window, this.windowCoder) +
 " " +
 encodeToBase64(wvalue.value.key, this.keyCoder);
-  if (!this.groups.has(wkey)) {
-this.groups.set(wkey, this.combineFn.createAccumulator());
-  }
-  this.groups.set(
-wkey,
-this.combineFn.addInput(this.groups.get(wkey), wvalue.value.value),
-  );
+  const existingAccumulator = this.groups.get(wkey);
+  const newAccumulator =
+existingAccumulator !== undefined
+  ? this.combineFn.addInput(existingAccumulator, wvalue.value.value)
+  : this.combineFn.addInput(
+  this.combineFn.createAccumulator(),
+  wvalue.value.value,
+);
+  // Move to end (most recently used) by delete + set
+  this.touchKey(wkey, newAccumulator);

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The logic for updating the accumulator and managing the LRU behavior can be 
simplified and made slightly more performant. The current approach uses a 
`touchKey` helper method which is called for every element. This results in a 
`delete` operation even for new keys, which is unnecessary. By inlining this 
logic into the `process` method, we can avoid the helper and the extra 
operation for new keys, making the code more concise and efficient. This change 
would also make the `touchKey` method (lines 477-480) obsolete, allowing for 
its removal.
   
   ```typescript
 const accumulator = this.groups.get(wkey);
 if (accumulator !== undefined) {
   this.groups.delete(wkey);
 }
 this.groups.set(
   wkey,
   this.combineFn.addInput(
 accumulator ?? this.combineFn.createAccumulator(),
 wvalue.value.value
   )
 );
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Add LRU cache eviction to CombinePerKeyPrecombineOperator [beam]

2026-02-02 Thread via GitHub


damccorm commented on PR #37466:
URL: https://github.com/apache/beam/pull/37466#issuecomment-3837403087

   /gemini review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Add LRU cache eviction to CombinePerKeyPrecombineOperator [beam]

2026-01-31 Thread via GitHub


github-actions[bot] commented on PR #37466:
URL: https://github.com/apache/beam/pull/37466#issuecomment-3828896154

   Assigning reviewers:
   
   R: @Abacn added as fallback since no labels match configuration
   
   
   Note: If you would like to opt out of this review, comment `assign to next 
reviewer`.
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any 
comment or push by the author will return the attention set to the reviewers)
   
   The PR bot will only process comments in the main thread (not review 
comments).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Add LRU cache eviction to CombinePerKeyPrecombineOperator [beam]

2026-01-31 Thread via GitHub


junaiddshaukat commented on PR #37466:
URL: https://github.com/apache/beam/pull/37466#issuecomment-3828855359

   Hi @damccorm, added LRU to PrecombineOperator. Please have a look when you 
got time, Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Add LRU cache eviction to CombinePerKeyPrecombineOperator [beam]

2026-01-31 Thread via GitHub


gemini-code-assist[bot] commented on PR #37466:
URL: https://github.com/apache/beam/pull/37466#issuecomment-3828820802

   ## Summary of Changes
   
   Hello @junaiddshaukat, I'm Gemini Code Assist[^1]! I'm currently reviewing 
this pull request and will post my feedback shortly. In the meantime, here's a 
summary to help you and other reviewers quickly get up to speed!
   
   This pull request significantly enhances the 
`CombinePerKeyPrecombineOperator` by integrating a robust LRU cache eviction 
mechanism. This change addresses a long-standing TODO, improving the operator's 
memory management and performance by intelligently removing less frequently 
accessed data when the cache capacity is reached. The implementation leverages 
native JavaScript Map behavior to maintain the LRU order efficiently.
   
   ### Highlights
   
   * **LRU Cache Eviction**: Implemented Least Recently Used (LRU) cache 
eviction for the `CombinePerKeyPrecombineOperator` to efficiently manage its 
internal state when the number of keys exceeds a defined limit.
   * **New `touchKey()` Method**: Introduced a `touchKey()` private method that 
moves an accessed key to the end of the internal Map, effectively marking it as 
most recently used by leveraging JavaScript Map's insertion order preservation.
   * **New `flushLRU()` Method**: Added a `flushLRU()` method responsible for 
evicting the least recently used entries from the cache (those at the front of 
the Map) until the cache size is within the specified limit.
   * **Updated `process()` Logic**: The `process()` method was updated to 
utilize the new `touchKey()` method for every key access, ensuring that the LRU 
order is maintained. It also now calls `flushLRU()` instead of the previous 
random flush logic when the cache size limit is exceeded.
   
   🧠 **New Feature in Public Preview:** You can now enable **Memory** to help 
**Gemini Code Assist** learn from your team's feedback. This makes future code 
reviews more consistent and personalized to your project's style. **Click 
[here](https://codeassist.google/code-review/login) to enable Memory in your 
admin console.**
   
   
   
   
   
   
   
   
   Using Gemini Code Assist
   
   
   The full guide for Gemini Code Assist can be found on our [documentation 
page](https://developers.google.com/gemini-code-assist/docs/review-github-code),
 here are some quick tips.
   
   Invoking Gemini
   
   You can request assistance from Gemini at any point by creating a comment 
using either `/gemini ` or `@gemini-code-assist `. Below is a 
summary of the supported commands on the current page.
   
   Feature | Command | Description
   --- | --- | ---
   Code Review | `/gemini review` | Performs a code review for the current pull 
request in its current state.
   Pull Request Summary | `/gemini summary` | Provides a summary of the current 
pull request in its current state.
   Comment | @gemini-code-assist | Responds in comments when explicitly tagged, 
both in pull request comments and review comments.
   Help | `/gemini help` | Displays a list of available commands.
   
   Customization
   
   To customize Gemini Code Assist for GitHub experience, repository 
maintainers can create a configuration file and/or provide a custom code review 
style guide (such as PEP-8 for Python) by creating and adding files to a 
`.gemini/` folder in the base of the repository. Detailed instructions can be 
found 
[here](https://developers.google.com/gemini-code-assist/docs/customize-gemini-behavior-github).
   
   Limitations & Feedback
   
   Gemini Code Assist may make mistakes. Please leave feedback on any instances 
where its feedback is incorrect or counter productive. You can react with 
:thumbsup: and :thumbsdown: on @gemini-code-assist comments. If you're 
interested in giving your feedback about your experience with Gemini Code 
Assist for Github and other Google products, sign up 
[here](https://google.qualtrics.com/jfe/form/SV_2cyuGuTWsEw84yG).
   
   You can also get AI-powered code generation, chat, as well as code 
reviews directly in the IDE at no cost with the [Gemini Code Assist IDE 
Extension](https://cloud.google.com/products/gemini/code-assist).
   
   
   
   
   
   [^1]: Review the [Privacy Notices](https://policies.google.com/privacy), 
[Generative AI Prohibited Use 
Policy](https://policies.google.com/terms/generative-ai/use-policy), [Terms of 
Service](https://policies.google.com/terms), and learn how to configure Gemini 
Code Assist in GitHub 
[here](https://developers.google.com/gemini-code-assist/docs/customize-gemini-behavior-github).
 Gemini can make mistakes, so double check it and [use code with 
caution](https://support.google.com/legal/answer/13505487).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this servic