Re: [PR] Add model manager that automatically manage model across processes [beam]

2026-02-04 Thread via GitHub


damccorm merged PR #37113:
URL: https://github.com/apache/beam/pull/37113


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Add model manager that automatically manage model across processes [beam]

2026-02-04 Thread via GitHub


AMOOOMA commented on code in PR #37113:
URL: https://github.com/apache/beam/pull/37113#discussion_r2765132216


##
sdks/python/apache_beam/ml/inference/model_manager.py:
##
@@ -0,0 +1,747 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Module for managing ML models in Apache Beam pipelines.
+
+This module provides classes and functions to efficiently manage multiple
+machine learning models within Apache Beam pipelines. It includes functionality
+for loading, caching, and updating models using multi-process shared memory,
+ensuring that models are reused across different workers to optimize resource
+usage and performance.
+"""
+
+import gc
+import heapq
+import itertools
+import logging
+import subprocess
+import threading
+import time
+from collections import Counter
+from collections import OrderedDict
+from collections import defaultdict
+from collections import deque
+from typing import Any
+from typing import Callable
+from typing import Dict
+from typing import Optional
+from typing import Tuple
+
+import numpy as np
+import torch
+from scipy.optimize import nnls
+
+from apache_beam.utils import multi_process_shared
+
+logger = logging.getLogger(__name__)
+
+
+class GPUMonitor:
+  """Monitors GPU memory usage in a separate thread using nvidia-smi.
+
+  This class continuously polls GPU memory statistics to track current usage
+  and peak usage over a sliding time window. It serves as the source of truth
+  for the ModelManager's resource decisions.
+
+  Attributes:
+fallback_memory_mb: Default total memory if hardware detection fails.
+poll_interval: Seconds between memory checks.
+peak_window_seconds: Duration to track peak memory usage.
+  """
+  def __init__(
+  self,
+  fallback_memory_mb: float = 16000.0,
+  poll_interval: float = 0.5,
+  peak_window_seconds: float = 30.0):
+self._current_usage = 0.0
+self._peak_usage = 0.0
+self._total_memory = fallback_memory_mb
+self._poll_interval = poll_interval
+self._peak_window_seconds = peak_window_seconds
+self._memory_history = deque()
+self._running = False
+self._thread = None
+self._lock = threading.Lock()
+
+  def _detect_hardware(self):
+try:
+  cmd = [
+  "nvidia-smi",
+  "--query-gpu=memory.total",
+  "--format=csv,noheader,nounits"
+  ]
+  output = subprocess.check_output(cmd, text=True).strip()
+  self._total_memory = float(output)
+  return True
+except (FileNotFoundError, subprocess.CalledProcessError):
+  logger.warning(
+  "nvidia-smi not found or failed. Defaulting total memory to %s MB",
+  self._total_memory)
+  return False
+except Exception as e:
+  logger.warning(
+  "Error parsing nvidia-smi output: %s. "
+  "Defaulting total memory to %s MB",
+  e,
+  self._total_memory)
+  return False
+
+  def start(self):
+self._gpu_available = self._detect_hardware()
+if self._running or not self._gpu_available:
+  return
+self._running = True
+self._thread = threading.Thread(target=self._poll_loop, daemon=True)
+self._thread.start()
+
+  def stop(self):
+self._running = False
+if self._thread:
+  self._thread.join()
+
+  def reset_peak(self):
+with self._lock:
+  now = time.time()
+  self._memory_history.clear()
+  self._memory_history.append((now, self._current_usage))
+  self._peak_usage = self._current_usage
+
+  def get_stats(self) -> Tuple[float, float, float]:
+with self._lock:
+  return self._current_usage, self._peak_usage, self._total_memory
+
+  def refresh(self):
+"""Forces an immediate poll of the GPU."""
+usage = self._get_nvidia_smi_used()
+now = time.time()
+with self._lock:
+  self._current_usage = usage
+  self._memory_history.append((now, usage))
+  # Recalculate peak immediately
+  while self._memory_history and (now - self._memory_history[0][0]
+  > self._peak_window_seconds):
+self._memory_history.popleft()
+  self._peak_usage = (
+  max(m for _, m in self._memory_history)
+  if self._memory_history else usage)
+
+  def

Re: [PR] Add model manager that automatically manage model across processes [beam]

2026-02-04 Thread via GitHub


damccorm commented on code in PR #37113:
URL: https://github.com/apache/beam/pull/37113#discussion_r2764995461


##
sdks/python/apache_beam/ml/inference/model_manager.py:
##
@@ -0,0 +1,747 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Module for managing ML models in Apache Beam pipelines.
+
+This module provides classes and functions to efficiently manage multiple
+machine learning models within Apache Beam pipelines. It includes functionality
+for loading, caching, and updating models using multi-process shared memory,
+ensuring that models are reused across different workers to optimize resource
+usage and performance.
+"""
+
+import gc
+import heapq
+import itertools
+import logging
+import subprocess
+import threading
+import time
+from collections import Counter
+from collections import OrderedDict
+from collections import defaultdict
+from collections import deque
+from typing import Any
+from typing import Callable
+from typing import Dict
+from typing import Optional
+from typing import Tuple
+
+import numpy as np
+import torch
+from scipy.optimize import nnls
+
+from apache_beam.utils import multi_process_shared
+
+logger = logging.getLogger(__name__)
+
+
+class GPUMonitor:
+  """Monitors GPU memory usage in a separate thread using nvidia-smi.
+
+  This class continuously polls GPU memory statistics to track current usage
+  and peak usage over a sliding time window. It serves as the source of truth
+  for the ModelManager's resource decisions.
+
+  Attributes:
+fallback_memory_mb: Default total memory if hardware detection fails.
+poll_interval: Seconds between memory checks.
+peak_window_seconds: Duration to track peak memory usage.
+  """
+  def __init__(
+  self,
+  fallback_memory_mb: float = 16000.0,
+  poll_interval: float = 0.5,
+  peak_window_seconds: float = 30.0):
+self._current_usage = 0.0
+self._peak_usage = 0.0
+self._total_memory = fallback_memory_mb
+self._poll_interval = poll_interval
+self._peak_window_seconds = peak_window_seconds
+self._memory_history = deque()
+self._running = False
+self._thread = None
+self._lock = threading.Lock()
+
+  def _detect_hardware(self):
+try:
+  cmd = [
+  "nvidia-smi",
+  "--query-gpu=memory.total",
+  "--format=csv,noheader,nounits"
+  ]
+  output = subprocess.check_output(cmd, text=True).strip()
+  self._total_memory = float(output)
+  return True
+except (FileNotFoundError, subprocess.CalledProcessError):
+  logger.warning(
+  "nvidia-smi not found or failed. Defaulting total memory to %s MB",
+  self._total_memory)
+  return False
+except Exception as e:
+  logger.warning(
+  "Error parsing nvidia-smi output: %s. "
+  "Defaulting total memory to %s MB",
+  e,
+  self._total_memory)
+  return False
+
+  def start(self):
+self._gpu_available = self._detect_hardware()
+if self._running or not self._gpu_available:
+  return
+self._running = True
+self._thread = threading.Thread(target=self._poll_loop, daemon=True)
+self._thread.start()
+
+  def stop(self):
+self._running = False
+if self._thread:
+  self._thread.join()
+
+  def reset_peak(self):
+with self._lock:
+  now = time.time()
+  self._memory_history.clear()
+  self._memory_history.append((now, self._current_usage))
+  self._peak_usage = self._current_usage
+
+  def get_stats(self) -> Tuple[float, float, float]:
+with self._lock:
+  return self._current_usage, self._peak_usage, self._total_memory
+
+  def refresh(self):
+"""Forces an immediate poll of the GPU."""
+usage = self._get_nvidia_smi_used()
+now = time.time()
+with self._lock:
+  self._current_usage = usage
+  self._memory_history.append((now, usage))
+  # Recalculate peak immediately
+  while self._memory_history and (now - self._memory_history[0][0]
+  > self._peak_window_seconds):
+self._memory_history.popleft()
+  self._peak_usage = (
+  max(m for _, m in self._memory_history)
+  if self._memory_history else usage)
+
+  de

Re: [PR] Add model manager that automatically manage model across processes [beam]

2026-02-03 Thread via GitHub


AMOOOMA commented on code in PR #37113:
URL: https://github.com/apache/beam/pull/37113#discussion_r2761494084


##
sdks/python/apache_beam/ml/inference/model_manager.py:
##
@@ -0,0 +1,721 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Module for managing ML models in Apache Beam pipelines.
+
+This module provides classes and functions to efficiently manage multiple
+machine learning models within Apache Beam pipelines. It includes functionality
+for loading, caching, and updating models using multi-process shared memory,
+ensuring that models are reused across different workers to optimize resource
+usage and performance.
+"""
+
+import time
+import threading
+import subprocess
+import logging
+import gc
+import numpy as np
+from scipy.optimize import nnls
+import torch
+import heapq
+import itertools
+from collections import defaultdict, deque, Counter, OrderedDict
+from typing import Dict, Any, Tuple, Optional, Callable
+from apache_beam.utils import multi_process_shared
+
+logger = logging.getLogger(__name__)
+
+
+class GPUMonitor:
+  """Monitors GPU memory usage in a separate thread using nvidia-smi.
+
+  This class continuously polls GPU memory statistics to track current usage
+  and peak usage over a sliding time window. It serves as the source of truth
+  for the ModelManager's resource decisions.
+
+  Attributes:
+fallback_memory_mb: Default total memory if hardware detection fails.
+poll_interval: Seconds between memory checks.
+peak_window_seconds: Duration to track peak memory usage.
+  """
+  def __init__(
+  self,
+  fallback_memory_mb: float = 16000.0,
+  poll_interval: float = 0.5,
+  peak_window_seconds: float = 30.0):
+self._current_usage = 0.0
+self._peak_usage = 0.0
+self._total_memory = fallback_memory_mb
+self._poll_interval = poll_interval
+self._peak_window_seconds = peak_window_seconds
+self._memory_history = deque()
+self._running = False
+self._thread = None
+self._lock = threading.Lock()
+
+  def _detect_hardware(self):
+try:
+  cmd = [
+  "nvidia-smi",
+  "--query-gpu=memory.total",
+  "--format=csv,noheader,nounits"
+  ]
+  output = subprocess.check_output(cmd, text=True).strip()
+  self._total_memory = float(output)
+  return True
+except (FileNotFoundError, subprocess.CalledProcessError):
+  logger.warning(
+  "nvidia-smi not found or failed. Defaulting total memory to %s MB",
+  self._total_memory)
+  return False
+except Exception as e:
+  logger.warning(
+  "Error parsing nvidia-smi output: %s. "
+  "Defaulting total memory to %s MB",
+  e,
+  self._total_memory)
+  return False
+
+  def start(self):
+self._gpu_available = self._detect_hardware()
+if self._running or not self._gpu_available:
+  return
+self._running = True
+self._thread = threading.Thread(target=self._poll_loop, daemon=True)
+self._thread.start()
+
+  def stop(self):
+self._running = False
+if self._thread:
+  self._thread.join()
+
+  def reset_peak(self):
+with self._lock:
+  now = time.time()
+  self._memory_history.clear()
+  self._memory_history.append((now, self._current_usage))
+  self._peak_usage = self._current_usage
+
+  def get_stats(self) -> Tuple[float, float, float]:
+with self._lock:
+  return self._current_usage, self._peak_usage, self._total_memory
+
+  def refresh(self):
+"""Forces an immediate poll of the GPU."""
+usage = self._get_nvidia_smi_used()
+now = time.time()
+with self._lock:
+  self._current_usage = usage
+  self._memory_history.append((now, usage))
+  # Recalculate peak immediately
+  while self._memory_history and (now - self._memory_history[0][0]
+  > self._peak_window_seconds):
+self._memory_history.popleft()
+  self._peak_usage = (
+  max(m for _, m in self._memory_history)
+  if self._memory_history else usage)
+
+  def _get_nvidia_smi_used(self) -> float:
+try:
+  cmd = "nvidia-smi --query-gpu=memory.free --format=csv,noheader,nounits"
+  output = subproce

Re: [PR] Add model manager that automatically manage model across processes [beam]

2026-02-03 Thread via GitHub


AMOOOMA commented on code in PR #37113:
URL: https://github.com/apache/beam/pull/37113#discussion_r2761483860


##
sdks/python/apache_beam/ml/inference/model_manager.py:
##
@@ -0,0 +1,721 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Module for managing ML models in Apache Beam pipelines.
+
+This module provides classes and functions to efficiently manage multiple
+machine learning models within Apache Beam pipelines. It includes functionality
+for loading, caching, and updating models using multi-process shared memory,
+ensuring that models are reused across different workers to optimize resource
+usage and performance.
+"""
+
+import time
+import threading
+import subprocess
+import logging
+import gc
+import numpy as np
+from scipy.optimize import nnls
+import torch
+import heapq
+import itertools
+from collections import defaultdict, deque, Counter, OrderedDict
+from typing import Dict, Any, Tuple, Optional, Callable
+from apache_beam.utils import multi_process_shared
+
+logger = logging.getLogger(__name__)
+
+
+class GPUMonitor:
+  """Monitors GPU memory usage in a separate thread using nvidia-smi.
+
+  This class continuously polls GPU memory statistics to track current usage
+  and peak usage over a sliding time window. It serves as the source of truth
+  for the ModelManager's resource decisions.
+
+  Attributes:
+fallback_memory_mb: Default total memory if hardware detection fails.
+poll_interval: Seconds between memory checks.
+peak_window_seconds: Duration to track peak memory usage.
+  """
+  def __init__(
+  self,
+  fallback_memory_mb: float = 16000.0,
+  poll_interval: float = 0.5,
+  peak_window_seconds: float = 30.0):
+self._current_usage = 0.0
+self._peak_usage = 0.0
+self._total_memory = fallback_memory_mb
+self._poll_interval = poll_interval
+self._peak_window_seconds = peak_window_seconds
+self._memory_history = deque()
+self._running = False
+self._thread = None
+self._lock = threading.Lock()
+
+  def _detect_hardware(self):
+try:
+  cmd = [
+  "nvidia-smi",
+  "--query-gpu=memory.total",
+  "--format=csv,noheader,nounits"
+  ]
+  output = subprocess.check_output(cmd, text=True).strip()
+  self._total_memory = float(output)
+  return True
+except (FileNotFoundError, subprocess.CalledProcessError):
+  logger.warning(
+  "nvidia-smi not found or failed. Defaulting total memory to %s MB",
+  self._total_memory)
+  return False
+except Exception as e:
+  logger.warning(
+  "Error parsing nvidia-smi output: %s. "
+  "Defaulting total memory to %s MB",
+  e,
+  self._total_memory)
+  return False
+
+  def start(self):
+self._gpu_available = self._detect_hardware()
+if self._running or not self._gpu_available:
+  return
+self._running = True
+self._thread = threading.Thread(target=self._poll_loop, daemon=True)
+self._thread.start()
+
+  def stop(self):
+self._running = False
+if self._thread:
+  self._thread.join()
+
+  def reset_peak(self):
+with self._lock:
+  now = time.time()
+  self._memory_history.clear()
+  self._memory_history.append((now, self._current_usage))
+  self._peak_usage = self._current_usage
+
+  def get_stats(self) -> Tuple[float, float, float]:
+with self._lock:
+  return self._current_usage, self._peak_usage, self._total_memory
+
+  def refresh(self):
+"""Forces an immediate poll of the GPU."""
+usage = self._get_nvidia_smi_used()
+now = time.time()
+with self._lock:
+  self._current_usage = usage
+  self._memory_history.append((now, usage))
+  # Recalculate peak immediately
+  while self._memory_history and (now - self._memory_history[0][0]
+  > self._peak_window_seconds):
+self._memory_history.popleft()
+  self._peak_usage = (
+  max(m for _, m in self._memory_history)
+  if self._memory_history else usage)
+
+  def _get_nvidia_smi_used(self) -> float:
+try:
+  cmd = "nvidia-smi --query-gpu=memory.free --format=csv,noheader,nounits"
+  output = subproce

Re: [PR] Add model manager that automatically manage model across processes [beam]

2026-02-02 Thread via GitHub


damccorm commented on code in PR #37113:
URL: https://github.com/apache/beam/pull/37113#discussion_r2755802583


##
sdks/python/apache_beam/ml/inference/model_manager.py:
##
@@ -0,0 +1,721 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Module for managing ML models in Apache Beam pipelines.
+
+This module provides classes and functions to efficiently manage multiple
+machine learning models within Apache Beam pipelines. It includes functionality
+for loading, caching, and updating models using multi-process shared memory,
+ensuring that models are reused across different workers to optimize resource
+usage and performance.
+"""
+
+import time
+import threading
+import subprocess
+import logging
+import gc
+import numpy as np
+from scipy.optimize import nnls
+import torch
+import heapq
+import itertools
+from collections import defaultdict, deque, Counter, OrderedDict
+from typing import Dict, Any, Tuple, Optional, Callable
+from apache_beam.utils import multi_process_shared
+
+logger = logging.getLogger(__name__)
+
+
+class GPUMonitor:
+  """Monitors GPU memory usage in a separate thread using nvidia-smi.
+
+  This class continuously polls GPU memory statistics to track current usage
+  and peak usage over a sliding time window. It serves as the source of truth
+  for the ModelManager's resource decisions.
+
+  Attributes:
+fallback_memory_mb: Default total memory if hardware detection fails.
+poll_interval: Seconds between memory checks.
+peak_window_seconds: Duration to track peak memory usage.
+  """
+  def __init__(
+  self,
+  fallback_memory_mb: float = 16000.0,
+  poll_interval: float = 0.5,
+  peak_window_seconds: float = 30.0):
+self._current_usage = 0.0
+self._peak_usage = 0.0
+self._total_memory = fallback_memory_mb
+self._poll_interval = poll_interval
+self._peak_window_seconds = peak_window_seconds
+self._memory_history = deque()
+self._running = False
+self._thread = None
+self._lock = threading.Lock()
+
+  def _detect_hardware(self):
+try:
+  cmd = [
+  "nvidia-smi",
+  "--query-gpu=memory.total",
+  "--format=csv,noheader,nounits"
+  ]
+  output = subprocess.check_output(cmd, text=True).strip()
+  self._total_memory = float(output)
+  return True
+except (FileNotFoundError, subprocess.CalledProcessError):
+  logger.warning(
+  "nvidia-smi not found or failed. Defaulting total memory to %s MB",
+  self._total_memory)
+  return False
+except Exception as e:
+  logger.warning(
+  "Error parsing nvidia-smi output: %s. "
+  "Defaulting total memory to %s MB",
+  e,
+  self._total_memory)
+  return False
+
+  def start(self):
+self._gpu_available = self._detect_hardware()
+if self._running or not self._gpu_available:
+  return
+self._running = True
+self._thread = threading.Thread(target=self._poll_loop, daemon=True)
+self._thread.start()
+
+  def stop(self):
+self._running = False
+if self._thread:
+  self._thread.join()
+
+  def reset_peak(self):
+with self._lock:
+  now = time.time()
+  self._memory_history.clear()
+  self._memory_history.append((now, self._current_usage))
+  self._peak_usage = self._current_usage
+
+  def get_stats(self) -> Tuple[float, float, float]:
+with self._lock:
+  return self._current_usage, self._peak_usage, self._total_memory
+
+  def refresh(self):
+"""Forces an immediate poll of the GPU."""
+usage = self._get_nvidia_smi_used()
+now = time.time()
+with self._lock:
+  self._current_usage = usage
+  self._memory_history.append((now, usage))
+  # Recalculate peak immediately
+  while self._memory_history and (now - self._memory_history[0][0]
+  > self._peak_window_seconds):
+self._memory_history.popleft()
+  self._peak_usage = (
+  max(m for _, m in self._memory_history)
+  if self._memory_history else usage)
+
+  def _get_nvidia_smi_used(self) -> float:
+try:
+  cmd = "nvidia-smi --query-gpu=memory.free --format=csv,noheader,nounits"
+  output = subproc

Re: [PR] Add model manager that automatically manage model across processes [beam]

2026-02-02 Thread via GitHub


damccorm commented on code in PR #37113:
URL: https://github.com/apache/beam/pull/37113#discussion_r2755798969


##
sdks/python/apache_beam/ml/inference/model_manager.py:
##
@@ -0,0 +1,721 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Module for managing ML models in Apache Beam pipelines.
+
+This module provides classes and functions to efficiently manage multiple
+machine learning models within Apache Beam pipelines. It includes functionality
+for loading, caching, and updating models using multi-process shared memory,
+ensuring that models are reused across different workers to optimize resource
+usage and performance.
+"""
+
+import time
+import threading
+import subprocess
+import logging
+import gc
+import numpy as np
+from scipy.optimize import nnls
+import torch
+import heapq
+import itertools
+from collections import defaultdict, deque, Counter, OrderedDict
+from typing import Dict, Any, Tuple, Optional, Callable
+from apache_beam.utils import multi_process_shared
+
+logger = logging.getLogger(__name__)
+
+
+class GPUMonitor:
+  """Monitors GPU memory usage in a separate thread using nvidia-smi.
+
+  This class continuously polls GPU memory statistics to track current usage
+  and peak usage over a sliding time window. It serves as the source of truth
+  for the ModelManager's resource decisions.
+
+  Attributes:
+fallback_memory_mb: Default total memory if hardware detection fails.
+poll_interval: Seconds between memory checks.
+peak_window_seconds: Duration to track peak memory usage.
+  """
+  def __init__(
+  self,
+  fallback_memory_mb: float = 16000.0,
+  poll_interval: float = 0.5,
+  peak_window_seconds: float = 30.0):
+self._current_usage = 0.0
+self._peak_usage = 0.0
+self._total_memory = fallback_memory_mb
+self._poll_interval = poll_interval
+self._peak_window_seconds = peak_window_seconds
+self._memory_history = deque()
+self._running = False
+self._thread = None
+self._lock = threading.Lock()
+
+  def _detect_hardware(self):
+try:
+  cmd = [
+  "nvidia-smi",
+  "--query-gpu=memory.total",
+  "--format=csv,noheader,nounits"
+  ]
+  output = subprocess.check_output(cmd, text=True).strip()
+  self._total_memory = float(output)
+  return True
+except (FileNotFoundError, subprocess.CalledProcessError):
+  logger.warning(
+  "nvidia-smi not found or failed. Defaulting total memory to %s MB",
+  self._total_memory)
+  return False
+except Exception as e:
+  logger.warning(
+  "Error parsing nvidia-smi output: %s. "
+  "Defaulting total memory to %s MB",
+  e,
+  self._total_memory)
+  return False
+
+  def start(self):
+self._gpu_available = self._detect_hardware()
+if self._running or not self._gpu_available:
+  return
+self._running = True
+self._thread = threading.Thread(target=self._poll_loop, daemon=True)
+self._thread.start()
+
+  def stop(self):
+self._running = False
+if self._thread:
+  self._thread.join()
+
+  def reset_peak(self):
+with self._lock:
+  now = time.time()
+  self._memory_history.clear()
+  self._memory_history.append((now, self._current_usage))
+  self._peak_usage = self._current_usage
+
+  def get_stats(self) -> Tuple[float, float, float]:
+with self._lock:
+  return self._current_usage, self._peak_usage, self._total_memory
+
+  def refresh(self):
+"""Forces an immediate poll of the GPU."""
+usage = self._get_nvidia_smi_used()
+now = time.time()
+with self._lock:
+  self._current_usage = usage
+  self._memory_history.append((now, usage))
+  # Recalculate peak immediately
+  while self._memory_history and (now - self._memory_history[0][0]
+  > self._peak_window_seconds):
+self._memory_history.popleft()
+  self._peak_usage = (
+  max(m for _, m in self._memory_history)
+  if self._memory_history else usage)
+
+  def _get_nvidia_smi_used(self) -> float:
+try:
+  cmd = "nvidia-smi --query-gpu=memory.free --format=csv,noheader,nounits"
+  output = subproc

Re: [PR] Add model manager that automatically manage model across processes [beam]

2026-01-30 Thread via GitHub


AMOOOMA commented on code in PR #37113:
URL: https://github.com/apache/beam/pull/37113#discussion_r2747636002


##
sdks/python/apache_beam/ml/inference/model_manager_test.py:
##
@@ -0,0 +1,596 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import random
+import threading
+import time
+import unittest
+from concurrent.futures import ThreadPoolExecutor
+from unittest.mock import patch
+
+from apache_beam.utils import multi_process_shared
+
+try:
+  from apache_beam.ml.inference.model_manager import GPUMonitor
+  from apache_beam.ml.inference.model_manager import ModelManager
+  from apache_beam.ml.inference.model_manager import ResourceEstimator
+except ImportError as e:
+  raise unittest.SkipTest("Model Manager dependencies are not installed")
+
+
+class MockGPUMonitor:
+  """
+  Simulates GPU hardware with cumulative memory tracking.
+  Allows simulating specific allocation spikes and baseline usage.
+  """
+  def __init__(self, total_memory=12000.0, peak_window: int = 5):
+self._current = 0.0
+self._peak = 0.0
+self._total = total_memory
+self._lock = threading.Lock()
+self.running = False
+self.history = []
+self.peak_window = peak_window
+
+  def start(self):
+self.running = True
+
+  def stop(self):
+self.running = False
+
+  def get_stats(self):
+with self._lock:
+  return self._current, self._peak, self._total
+
+  def reset_peak(self):
+with self._lock:
+  self._peak = self._current
+  self.history = [self._current]
+
+  def set_usage(self, current_mb):
+"""Sets absolute usage (legacy helper)."""
+with self._lock:
+  self._current = current_mb
+  self._peak = max(self._peak, current_mb)
+
+  def allocate(self, amount_mb):
+"""Simulates memory allocation (e.g., tensors loaded to VRAM)."""
+with self._lock:
+  self._current += amount_mb
+  self.history.append(self._current)
+  if len(self.history) > self.peak_window:
+self.history.pop(0)
+  self._peak = max(self.history)
+
+  def free(self, amount_mb):
+"""Simulates memory freeing (not used often if pooling is active)."""
+with self._lock:
+  self._current = max(0.0, self._current - amount_mb)
+  self.history.append(self._current)
+  if len(self.history) > self.peak_window:
+self.history.pop(0)
+  self._peak = max(self.history)
+
+  def refresh(self):
+"""Simulates a refresh of the monitor stats (no-op for mock)."""
+pass
+
+
+class MockModel:
+  def __init__(self, name, size, monitor):
+self.name = name
+self.size = size
+self.monitor = monitor
+self.deleted = False
+self.monitor.allocate(size)
+
+  def mock_model_unsafe_hard_delete(self):
+if not self.deleted:
+  self.monitor.free(self.size)
+  self.deleted = True
+
+
+class Counter(object):
+  def __init__(self, start=0):
+self.running = start
+self.lock = threading.Lock()
+
+  def get(self):
+return self.running
+
+  def increment(self, value=1):
+with self.lock:
+  self.running += value
+  return self.running
+
+
+class TestModelManager(unittest.TestCase):
+  def setUp(self):
+"""Force reset the Singleton ModelManager before every test."""
+ModelManager._instance = None

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Add model manager that automatically manage model across processes [beam]

2026-01-30 Thread via GitHub


AMOOOMA commented on code in PR #37113:
URL: https://github.com/apache/beam/pull/37113#discussion_r2747632626


##
sdks/python/apache_beam/ml/inference/model_manager.py:
##
@@ -0,0 +1,730 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Module for managing ML models in Apache Beam pipelines.
+
+This module provides classes and functions to efficiently manage multiple
+machine learning models within Apache Beam pipelines. It includes functionality
+for loading, caching, and updating models using multi-process shared memory,
+ensuring that models are reused across different workers to optimize resource
+usage and performance.
+"""
+
+import gc
+import heapq
+import itertools
+import logging
+import subprocess
+import threading
+import time
+from collections import Counter
+from collections import OrderedDict
+from collections import defaultdict
+from collections import deque
+from typing import Any
+from typing import Callable
+from typing import Dict
+from typing import Optional
+from typing import Tuple
+
+import numpy as np
+import torch
+from scipy.optimize import nnls
+
+from apache_beam.utils import multi_process_shared
+
+logger = logging.getLogger(__name__)
+
+
+class GPUMonitor:
+  """Monitors GPU memory usage in a separate thread using nvidia-smi.
+
+  This class continuously polls GPU memory statistics to track current usage
+  and peak usage over a sliding time window. It serves as the source of truth
+  for the ModelManager's resource decisions.
+
+  Attributes:
+fallback_memory_mb: Default total memory if hardware detection fails.
+poll_interval: Seconds between memory checks.
+peak_window_seconds: Duration to track peak memory usage.
+  """
+  def __init__(
+  self,
+  fallback_memory_mb: float = 16000.0,
+  poll_interval: float = 0.5,
+  peak_window_seconds: float = 30.0):
+self._current_usage = 0.0
+self._peak_usage = 0.0
+self._total_memory = fallback_memory_mb
+self._poll_interval = poll_interval
+self._peak_window_seconds = peak_window_seconds
+self._memory_history = deque()
+self._running = False
+self._thread = None
+self._lock = threading.Lock()
+
+  def _detect_hardware(self):
+try:
+  cmd = [
+  "nvidia-smi",
+  "--query-gpu=memory.total",
+  "--format=csv,noheader,nounits"
+  ]
+  output = subprocess.check_output(cmd, text=True).strip()
+  self._total_memory = float(output)
+  return True
+except (FileNotFoundError, subprocess.CalledProcessError):
+  logger.warning(
+  "nvidia-smi not found or failed. Defaulting total memory to %s MB",
+  self._total_memory)
+  return False
+except Exception as e:
+  logger.warning(
+  "Error parsing nvidia-smi output: %s. "
+  "Defaulting total memory to %s MB",
+  e,
+  self._total_memory)
+  return False
+
+  def start(self):
+self._gpu_available = self._detect_hardware()
+if self._running or not self._gpu_available:
+  return
+self._running = True
+self._thread = threading.Thread(target=self._poll_loop, daemon=True)
+self._thread.start()
+
+  def stop(self):
+self._running = False
+if self._thread:
+  self._thread.join()
+
+  def reset_peak(self):
+with self._lock:
+  now = time.time()
+  self._memory_history.clear()
+  self._memory_history.append((now, self._current_usage))
+  self._peak_usage = self._current_usage
+
+  def get_stats(self) -> Tuple[float, float, float]:
+with self._lock:
+  return self._current_usage, self._peak_usage, self._total_memory
+
+  def refresh(self):
+"""Forces an immediate poll of the GPU."""
+usage = self._get_nvidia_smi_used()
+now = time.time()
+with self._lock:
+  self._current_usage = usage
+  self._memory_history.append((now, usage))
+  # Recalculate peak immediately
+  while self._memory_history and (now - self._memory_history[0][0]
+  > self._peak_window_seconds):
+self._memory_history.popleft()
+  self._peak_usage = (
+  max(m for _, m in self._memory_history)
+  if self._memory_history else usage)
+
+  def

Re: [PR] Add model manager that automatically manage model across processes [beam]

2026-01-30 Thread via GitHub


AMOOOMA commented on code in PR #37113:
URL: https://github.com/apache/beam/pull/37113#discussion_r2747631106


##
sdks/python/apache_beam/ml/inference/model_manager.py:
##
@@ -0,0 +1,730 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Module for managing ML models in Apache Beam pipelines.
+
+This module provides classes and functions to efficiently manage multiple
+machine learning models within Apache Beam pipelines. It includes functionality
+for loading, caching, and updating models using multi-process shared memory,
+ensuring that models are reused across different workers to optimize resource
+usage and performance.
+"""
+
+import gc
+import heapq
+import itertools
+import logging
+import subprocess
+import threading
+import time
+from collections import Counter
+from collections import OrderedDict
+from collections import defaultdict
+from collections import deque
+from typing import Any
+from typing import Callable
+from typing import Dict
+from typing import Optional
+from typing import Tuple
+
+import numpy as np
+import torch
+from scipy.optimize import nnls
+
+from apache_beam.utils import multi_process_shared
+
+logger = logging.getLogger(__name__)
+
+
+class GPUMonitor:
+  """Monitors GPU memory usage in a separate thread using nvidia-smi.
+
+  This class continuously polls GPU memory statistics to track current usage
+  and peak usage over a sliding time window. It serves as the source of truth
+  for the ModelManager's resource decisions.
+
+  Attributes:
+fallback_memory_mb: Default total memory if hardware detection fails.
+poll_interval: Seconds between memory checks.
+peak_window_seconds: Duration to track peak memory usage.
+  """
+  def __init__(
+  self,
+  fallback_memory_mb: float = 16000.0,
+  poll_interval: float = 0.5,
+  peak_window_seconds: float = 30.0):
+self._current_usage = 0.0
+self._peak_usage = 0.0
+self._total_memory = fallback_memory_mb
+self._poll_interval = poll_interval
+self._peak_window_seconds = peak_window_seconds
+self._memory_history = deque()
+self._running = False
+self._thread = None
+self._lock = threading.Lock()
+
+  def _detect_hardware(self):
+try:
+  cmd = [
+  "nvidia-smi",
+  "--query-gpu=memory.total",
+  "--format=csv,noheader,nounits"
+  ]
+  output = subprocess.check_output(cmd, text=True).strip()
+  self._total_memory = float(output)
+  return True
+except (FileNotFoundError, subprocess.CalledProcessError):
+  logger.warning(
+  "nvidia-smi not found or failed. Defaulting total memory to %s MB",
+  self._total_memory)
+  return False
+except Exception as e:
+  logger.warning(
+  "Error parsing nvidia-smi output: %s. "
+  "Defaulting total memory to %s MB",
+  e,
+  self._total_memory)
+  return False
+
+  def start(self):
+self._gpu_available = self._detect_hardware()
+if self._running or not self._gpu_available:
+  return
+self._running = True
+self._thread = threading.Thread(target=self._poll_loop, daemon=True)
+self._thread.start()
+
+  def stop(self):
+self._running = False
+if self._thread:
+  self._thread.join()
+
+  def reset_peak(self):
+with self._lock:
+  now = time.time()
+  self._memory_history.clear()
+  self._memory_history.append((now, self._current_usage))
+  self._peak_usage = self._current_usage
+
+  def get_stats(self) -> Tuple[float, float, float]:
+with self._lock:
+  return self._current_usage, self._peak_usage, self._total_memory
+
+  def refresh(self):
+"""Forces an immediate poll of the GPU."""
+usage = self._get_nvidia_smi_used()
+now = time.time()
+with self._lock:
+  self._current_usage = usage
+  self._memory_history.append((now, usage))
+  # Recalculate peak immediately
+  while self._memory_history and (now - self._memory_history[0][0]
+  > self._peak_window_seconds):
+self._memory_history.popleft()
+  self._peak_usage = (
+  max(m for _, m in self._memory_history)
+  if self._memory_history else usage)
+
+  def

Re: [PR] Add model manager that automatically manage model across processes [beam]

2026-01-30 Thread via GitHub


AMOOOMA commented on code in PR #37113:
URL: https://github.com/apache/beam/pull/37113#discussion_r2747629755


##
sdks/python/apache_beam/ml/inference/model_manager.py:
##
@@ -0,0 +1,730 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Module for managing ML models in Apache Beam pipelines.
+
+This module provides classes and functions to efficiently manage multiple
+machine learning models within Apache Beam pipelines. It includes functionality
+for loading, caching, and updating models using multi-process shared memory,
+ensuring that models are reused across different workers to optimize resource
+usage and performance.
+"""
+
+import gc
+import heapq
+import itertools
+import logging
+import subprocess
+import threading
+import time
+from collections import Counter
+from collections import OrderedDict
+from collections import defaultdict
+from collections import deque
+from typing import Any
+from typing import Callable
+from typing import Dict
+from typing import Optional
+from typing import Tuple
+
+import numpy as np
+import torch
+from scipy.optimize import nnls
+
+from apache_beam.utils import multi_process_shared
+
+logger = logging.getLogger(__name__)
+
+
+class GPUMonitor:
+  """Monitors GPU memory usage in a separate thread using nvidia-smi.
+
+  This class continuously polls GPU memory statistics to track current usage
+  and peak usage over a sliding time window. It serves as the source of truth
+  for the ModelManager's resource decisions.
+
+  Attributes:
+fallback_memory_mb: Default total memory if hardware detection fails.
+poll_interval: Seconds between memory checks.
+peak_window_seconds: Duration to track peak memory usage.
+  """
+  def __init__(
+  self,
+  fallback_memory_mb: float = 16000.0,
+  poll_interval: float = 0.5,
+  peak_window_seconds: float = 30.0):
+self._current_usage = 0.0
+self._peak_usage = 0.0
+self._total_memory = fallback_memory_mb
+self._poll_interval = poll_interval
+self._peak_window_seconds = peak_window_seconds
+self._memory_history = deque()
+self._running = False
+self._thread = None
+self._lock = threading.Lock()
+
+  def _detect_hardware(self):
+try:
+  cmd = [
+  "nvidia-smi",
+  "--query-gpu=memory.total",
+  "--format=csv,noheader,nounits"
+  ]
+  output = subprocess.check_output(cmd, text=True).strip()
+  self._total_memory = float(output)
+  return True
+except (FileNotFoundError, subprocess.CalledProcessError):
+  logger.warning(
+  "nvidia-smi not found or failed. Defaulting total memory to %s MB",
+  self._total_memory)
+  return False
+except Exception as e:
+  logger.warning(
+  "Error parsing nvidia-smi output: %s. "
+  "Defaulting total memory to %s MB",
+  e,
+  self._total_memory)
+  return False
+
+  def start(self):
+self._gpu_available = self._detect_hardware()
+if self._running or not self._gpu_available:
+  return
+self._running = True
+self._thread = threading.Thread(target=self._poll_loop, daemon=True)
+self._thread.start()
+
+  def stop(self):
+self._running = False
+if self._thread:
+  self._thread.join()
+
+  def reset_peak(self):
+with self._lock:
+  now = time.time()
+  self._memory_history.clear()
+  self._memory_history.append((now, self._current_usage))
+  self._peak_usage = self._current_usage
+
+  def get_stats(self) -> Tuple[float, float, float]:
+with self._lock:
+  return self._current_usage, self._peak_usage, self._total_memory
+
+  def refresh(self):
+"""Forces an immediate poll of the GPU."""
+usage = self._get_nvidia_smi_used()
+now = time.time()
+with self._lock:
+  self._current_usage = usage
+  self._memory_history.append((now, usage))
+  # Recalculate peak immediately
+  while self._memory_history and (now - self._memory_history[0][0]
+  > self._peak_window_seconds):
+self._memory_history.popleft()
+  self._peak_usage = (
+  max(m for _, m in self._memory_history)
+  if self._memory_history else usage)
+
+  def

Re: [PR] Add model manager that automatically manage model across processes [beam]

2026-01-30 Thread via GitHub


AMOOOMA commented on code in PR #37113:
URL: https://github.com/apache/beam/pull/37113#discussion_r2747628526


##
sdks/python/apache_beam/ml/inference/model_manager.py:
##
@@ -0,0 +1,730 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Module for managing ML models in Apache Beam pipelines.
+
+This module provides classes and functions to efficiently manage multiple
+machine learning models within Apache Beam pipelines. It includes functionality
+for loading, caching, and updating models using multi-process shared memory,
+ensuring that models are reused across different workers to optimize resource
+usage and performance.
+"""
+
+import gc
+import heapq
+import itertools
+import logging
+import subprocess
+import threading
+import time
+from collections import Counter
+from collections import OrderedDict
+from collections import defaultdict
+from collections import deque
+from typing import Any
+from typing import Callable
+from typing import Dict
+from typing import Optional
+from typing import Tuple
+
+import numpy as np
+import torch
+from scipy.optimize import nnls
+
+from apache_beam.utils import multi_process_shared
+
+logger = logging.getLogger(__name__)
+
+
+class GPUMonitor:
+  """Monitors GPU memory usage in a separate thread using nvidia-smi.
+
+  This class continuously polls GPU memory statistics to track current usage
+  and peak usage over a sliding time window. It serves as the source of truth
+  for the ModelManager's resource decisions.
+
+  Attributes:
+fallback_memory_mb: Default total memory if hardware detection fails.
+poll_interval: Seconds between memory checks.
+peak_window_seconds: Duration to track peak memory usage.
+  """
+  def __init__(
+  self,
+  fallback_memory_mb: float = 16000.0,
+  poll_interval: float = 0.5,
+  peak_window_seconds: float = 30.0):
+self._current_usage = 0.0
+self._peak_usage = 0.0
+self._total_memory = fallback_memory_mb
+self._poll_interval = poll_interval
+self._peak_window_seconds = peak_window_seconds
+self._memory_history = deque()
+self._running = False
+self._thread = None
+self._lock = threading.Lock()
+
+  def _detect_hardware(self):
+try:
+  cmd = [
+  "nvidia-smi",
+  "--query-gpu=memory.total",
+  "--format=csv,noheader,nounits"
+  ]
+  output = subprocess.check_output(cmd, text=True).strip()
+  self._total_memory = float(output)
+  return True
+except (FileNotFoundError, subprocess.CalledProcessError):
+  logger.warning(
+  "nvidia-smi not found or failed. Defaulting total memory to %s MB",
+  self._total_memory)
+  return False
+except Exception as e:
+  logger.warning(
+  "Error parsing nvidia-smi output: %s. "
+  "Defaulting total memory to %s MB",
+  e,
+  self._total_memory)
+  return False
+
+  def start(self):
+self._gpu_available = self._detect_hardware()
+if self._running or not self._gpu_available:
+  return
+self._running = True
+self._thread = threading.Thread(target=self._poll_loop, daemon=True)
+self._thread.start()
+
+  def stop(self):
+self._running = False
+if self._thread:
+  self._thread.join()
+
+  def reset_peak(self):
+with self._lock:
+  now = time.time()
+  self._memory_history.clear()
+  self._memory_history.append((now, self._current_usage))
+  self._peak_usage = self._current_usage
+
+  def get_stats(self) -> Tuple[float, float, float]:
+with self._lock:
+  return self._current_usage, self._peak_usage, self._total_memory
+
+  def refresh(self):
+"""Forces an immediate poll of the GPU."""
+usage = self._get_nvidia_smi_used()
+now = time.time()
+with self._lock:
+  self._current_usage = usage
+  self._memory_history.append((now, usage))
+  # Recalculate peak immediately
+  while self._memory_history and (now - self._memory_history[0][0]
+  > self._peak_window_seconds):
+self._memory_history.popleft()
+  self._peak_usage = (
+  max(m for _, m in self._memory_history)
+  if self._memory_history else usage)
+
+  def

Re: [PR] Add model manager that automatically manage model across processes [beam]

2026-01-30 Thread via GitHub


AMOOOMA commented on code in PR #37113:
URL: https://github.com/apache/beam/pull/37113#discussion_r2747578101


##
sdks/python/apache_beam/ml/inference/model_manager.py:
##
@@ -0,0 +1,721 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Module for managing ML models in Apache Beam pipelines.
+
+This module provides classes and functions to efficiently manage multiple
+machine learning models within Apache Beam pipelines. It includes functionality
+for loading, caching, and updating models using multi-process shared memory,
+ensuring that models are reused across different workers to optimize resource
+usage and performance.
+"""
+
+import time
+import threading
+import subprocess
+import logging
+import gc
+import numpy as np
+from scipy.optimize import nnls
+import torch
+import heapq
+import itertools
+from collections import defaultdict, deque, Counter, OrderedDict
+from typing import Dict, Any, Tuple, Optional, Callable
+from apache_beam.utils import multi_process_shared
+
+logger = logging.getLogger(__name__)
+
+
+class GPUMonitor:
+  """Monitors GPU memory usage in a separate thread using nvidia-smi.
+
+  This class continuously polls GPU memory statistics to track current usage
+  and peak usage over a sliding time window. It serves as the source of truth
+  for the ModelManager's resource decisions.
+
+  Attributes:
+fallback_memory_mb: Default total memory if hardware detection fails.
+poll_interval: Seconds between memory checks.
+peak_window_seconds: Duration to track peak memory usage.
+  """
+  def __init__(
+  self,
+  fallback_memory_mb: float = 16000.0,
+  poll_interval: float = 0.5,
+  peak_window_seconds: float = 30.0):
+self._current_usage = 0.0
+self._peak_usage = 0.0
+self._total_memory = fallback_memory_mb
+self._poll_interval = poll_interval
+self._peak_window_seconds = peak_window_seconds
+self._memory_history = deque()
+self._running = False
+self._thread = None
+self._lock = threading.Lock()
+
+  def _detect_hardware(self):
+try:
+  cmd = [
+  "nvidia-smi",
+  "--query-gpu=memory.total",
+  "--format=csv,noheader,nounits"
+  ]
+  output = subprocess.check_output(cmd, text=True).strip()
+  self._total_memory = float(output)
+  return True
+except (FileNotFoundError, subprocess.CalledProcessError):
+  logger.warning(
+  "nvidia-smi not found or failed. Defaulting total memory to %s MB",
+  self._total_memory)
+  return False
+except Exception as e:
+  logger.warning(
+  "Error parsing nvidia-smi output: %s. "
+  "Defaulting total memory to %s MB",
+  e,
+  self._total_memory)
+  return False
+
+  def start(self):
+self._gpu_available = self._detect_hardware()
+if self._running or not self._gpu_available:
+  return
+self._running = True
+self._thread = threading.Thread(target=self._poll_loop, daemon=True)
+self._thread.start()
+
+  def stop(self):
+self._running = False
+if self._thread:
+  self._thread.join()
+
+  def reset_peak(self):
+with self._lock:
+  now = time.time()
+  self._memory_history.clear()
+  self._memory_history.append((now, self._current_usage))
+  self._peak_usage = self._current_usage
+
+  def get_stats(self) -> Tuple[float, float, float]:
+with self._lock:
+  return self._current_usage, self._peak_usage, self._total_memory
+
+  def refresh(self):
+"""Forces an immediate poll of the GPU."""
+usage = self._get_nvidia_smi_used()
+now = time.time()
+with self._lock:
+  self._current_usage = usage
+  self._memory_history.append((now, usage))
+  # Recalculate peak immediately
+  while self._memory_history and (now - self._memory_history[0][0]
+  > self._peak_window_seconds):
+self._memory_history.popleft()
+  self._peak_usage = (
+  max(m for _, m in self._memory_history)
+  if self._memory_history else usage)
+
+  def _get_nvidia_smi_used(self) -> float:
+try:
+  cmd = "nvidia-smi --query-gpu=memory.free --format=csv,noheader,nounits"
+  output = subproce

Re: [PR] Add model manager that automatically manage model across processes [beam]

2026-01-30 Thread via GitHub


AMOOOMA commented on code in PR #37113:
URL: https://github.com/apache/beam/pull/37113#discussion_r2747575188


##
sdks/python/apache_beam/ml/inference/model_manager.py:
##
@@ -0,0 +1,721 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Module for managing ML models in Apache Beam pipelines.
+
+This module provides classes and functions to efficiently manage multiple
+machine learning models within Apache Beam pipelines. It includes functionality
+for loading, caching, and updating models using multi-process shared memory,
+ensuring that models are reused across different workers to optimize resource
+usage and performance.
+"""
+
+import time
+import threading
+import subprocess
+import logging
+import gc
+import numpy as np
+from scipy.optimize import nnls
+import torch
+import heapq
+import itertools
+from collections import defaultdict, deque, Counter, OrderedDict
+from typing import Dict, Any, Tuple, Optional, Callable
+from apache_beam.utils import multi_process_shared
+
+logger = logging.getLogger(__name__)
+
+
+class GPUMonitor:
+  """Monitors GPU memory usage in a separate thread using nvidia-smi.
+
+  This class continuously polls GPU memory statistics to track current usage
+  and peak usage over a sliding time window. It serves as the source of truth
+  for the ModelManager's resource decisions.
+
+  Attributes:
+fallback_memory_mb: Default total memory if hardware detection fails.
+poll_interval: Seconds between memory checks.
+peak_window_seconds: Duration to track peak memory usage.
+  """
+  def __init__(
+  self,
+  fallback_memory_mb: float = 16000.0,
+  poll_interval: float = 0.5,
+  peak_window_seconds: float = 30.0):
+self._current_usage = 0.0
+self._peak_usage = 0.0
+self._total_memory = fallback_memory_mb
+self._poll_interval = poll_interval
+self._peak_window_seconds = peak_window_seconds
+self._memory_history = deque()
+self._running = False
+self._thread = None
+self._lock = threading.Lock()
+
+  def _detect_hardware(self):
+try:
+  cmd = [
+  "nvidia-smi",
+  "--query-gpu=memory.total",
+  "--format=csv,noheader,nounits"
+  ]
+  output = subprocess.check_output(cmd, text=True).strip()
+  self._total_memory = float(output)
+  return True
+except (FileNotFoundError, subprocess.CalledProcessError):
+  logger.warning(
+  "nvidia-smi not found or failed. Defaulting total memory to %s MB",
+  self._total_memory)
+  return False
+except Exception as e:
+  logger.warning(
+  "Error parsing nvidia-smi output: %s. "
+  "Defaulting total memory to %s MB",
+  e,
+  self._total_memory)
+  return False
+
+  def start(self):
+self._gpu_available = self._detect_hardware()
+if self._running or not self._gpu_available:
+  return
+self._running = True
+self._thread = threading.Thread(target=self._poll_loop, daemon=True)
+self._thread.start()
+
+  def stop(self):
+self._running = False
+if self._thread:
+  self._thread.join()
+
+  def reset_peak(self):
+with self._lock:
+  now = time.time()
+  self._memory_history.clear()
+  self._memory_history.append((now, self._current_usage))
+  self._peak_usage = self._current_usage
+
+  def get_stats(self) -> Tuple[float, float, float]:
+with self._lock:
+  return self._current_usage, self._peak_usage, self._total_memory
+
+  def refresh(self):
+"""Forces an immediate poll of the GPU."""
+usage = self._get_nvidia_smi_used()
+now = time.time()
+with self._lock:
+  self._current_usage = usage
+  self._memory_history.append((now, usage))
+  # Recalculate peak immediately
+  while self._memory_history and (now - self._memory_history[0][0]
+  > self._peak_window_seconds):
+self._memory_history.popleft()
+  self._peak_usage = (
+  max(m for _, m in self._memory_history)
+  if self._memory_history else usage)
+
+  def _get_nvidia_smi_used(self) -> float:
+try:
+  cmd = "nvidia-smi --query-gpu=memory.free --format=csv,noheader,nounits"
+  output = subproce

Re: [PR] Add model manager that automatically manage model across processes [beam]

2026-01-30 Thread via GitHub


AMOOOMA commented on code in PR #37113:
URL: https://github.com/apache/beam/pull/37113#discussion_r2747559629


##
sdks/python/apache_beam/ml/inference/model_manager.py:
##
@@ -0,0 +1,721 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Module for managing ML models in Apache Beam pipelines.
+
+This module provides classes and functions to efficiently manage multiple
+machine learning models within Apache Beam pipelines. It includes functionality
+for loading, caching, and updating models using multi-process shared memory,
+ensuring that models are reused across different workers to optimize resource
+usage and performance.
+"""
+
+import time
+import threading
+import subprocess
+import logging
+import gc
+import numpy as np
+from scipy.optimize import nnls
+import torch
+import heapq
+import itertools
+from collections import defaultdict, deque, Counter, OrderedDict
+from typing import Dict, Any, Tuple, Optional, Callable
+from apache_beam.utils import multi_process_shared
+
+logger = logging.getLogger(__name__)
+
+
+class GPUMonitor:
+  """Monitors GPU memory usage in a separate thread using nvidia-smi.
+
+  This class continuously polls GPU memory statistics to track current usage
+  and peak usage over a sliding time window. It serves as the source of truth
+  for the ModelManager's resource decisions.
+
+  Attributes:
+fallback_memory_mb: Default total memory if hardware detection fails.
+poll_interval: Seconds between memory checks.
+peak_window_seconds: Duration to track peak memory usage.
+  """
+  def __init__(
+  self,
+  fallback_memory_mb: float = 16000.0,
+  poll_interval: float = 0.5,
+  peak_window_seconds: float = 30.0):
+self._current_usage = 0.0
+self._peak_usage = 0.0
+self._total_memory = fallback_memory_mb
+self._poll_interval = poll_interval
+self._peak_window_seconds = peak_window_seconds
+self._memory_history = deque()
+self._running = False
+self._thread = None
+self._lock = threading.Lock()
+
+  def _detect_hardware(self):
+try:
+  cmd = [
+  "nvidia-smi",
+  "--query-gpu=memory.total",
+  "--format=csv,noheader,nounits"
+  ]
+  output = subprocess.check_output(cmd, text=True).strip()
+  self._total_memory = float(output)
+  return True
+except (FileNotFoundError, subprocess.CalledProcessError):
+  logger.warning(
+  "nvidia-smi not found or failed. Defaulting total memory to %s MB",
+  self._total_memory)
+  return False
+except Exception as e:
+  logger.warning(
+  "Error parsing nvidia-smi output: %s. "
+  "Defaulting total memory to %s MB",
+  e,
+  self._total_memory)
+  return False
+
+  def start(self):
+self._gpu_available = self._detect_hardware()
+if self._running or not self._gpu_available:
+  return
+self._running = True
+self._thread = threading.Thread(target=self._poll_loop, daemon=True)
+self._thread.start()
+
+  def stop(self):
+self._running = False
+if self._thread:
+  self._thread.join()
+
+  def reset_peak(self):
+with self._lock:
+  now = time.time()
+  self._memory_history.clear()
+  self._memory_history.append((now, self._current_usage))
+  self._peak_usage = self._current_usage
+
+  def get_stats(self) -> Tuple[float, float, float]:
+with self._lock:
+  return self._current_usage, self._peak_usage, self._total_memory
+
+  def refresh(self):
+"""Forces an immediate poll of the GPU."""
+usage = self._get_nvidia_smi_used()
+now = time.time()
+with self._lock:
+  self._current_usage = usage
+  self._memory_history.append((now, usage))
+  # Recalculate peak immediately
+  while self._memory_history and (now - self._memory_history[0][0]
+  > self._peak_window_seconds):
+self._memory_history.popleft()
+  self._peak_usage = (
+  max(m for _, m in self._memory_history)
+  if self._memory_history else usage)
+
+  def _get_nvidia_smi_used(self) -> float:
+try:
+  cmd = "nvidia-smi --query-gpu=memory.free --format=csv,noheader,nounits"
+  output = subproce

Re: [PR] Add model manager that automatically manage model across processes [beam]

2026-01-30 Thread via GitHub


AMOOOMA commented on code in PR #37113:
URL: https://github.com/apache/beam/pull/37113#discussion_r2747538599


##
sdks/python/apache_beam/ml/inference/model_manager.py:
##
@@ -0,0 +1,721 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Module for managing ML models in Apache Beam pipelines.
+
+This module provides classes and functions to efficiently manage multiple
+machine learning models within Apache Beam pipelines. It includes functionality
+for loading, caching, and updating models using multi-process shared memory,
+ensuring that models are reused across different workers to optimize resource
+usage and performance.
+"""
+
+import time
+import threading
+import subprocess
+import logging
+import gc
+import numpy as np
+from scipy.optimize import nnls
+import torch
+import heapq
+import itertools
+from collections import defaultdict, deque, Counter, OrderedDict
+from typing import Dict, Any, Tuple, Optional, Callable
+from apache_beam.utils import multi_process_shared
+
+logger = logging.getLogger(__name__)
+
+
+class GPUMonitor:
+  """Monitors GPU memory usage in a separate thread using nvidia-smi.
+
+  This class continuously polls GPU memory statistics to track current usage
+  and peak usage over a sliding time window. It serves as the source of truth
+  for the ModelManager's resource decisions.
+
+  Attributes:
+fallback_memory_mb: Default total memory if hardware detection fails.
+poll_interval: Seconds between memory checks.
+peak_window_seconds: Duration to track peak memory usage.
+  """
+  def __init__(
+  self,
+  fallback_memory_mb: float = 16000.0,
+  poll_interval: float = 0.5,
+  peak_window_seconds: float = 30.0):
+self._current_usage = 0.0
+self._peak_usage = 0.0
+self._total_memory = fallback_memory_mb
+self._poll_interval = poll_interval
+self._peak_window_seconds = peak_window_seconds
+self._memory_history = deque()
+self._running = False
+self._thread = None
+self._lock = threading.Lock()
+
+  def _detect_hardware(self):
+try:
+  cmd = [
+  "nvidia-smi",
+  "--query-gpu=memory.total",
+  "--format=csv,noheader,nounits"
+  ]
+  output = subprocess.check_output(cmd, text=True).strip()
+  self._total_memory = float(output)
+  return True
+except (FileNotFoundError, subprocess.CalledProcessError):
+  logger.warning(
+  "nvidia-smi not found or failed. Defaulting total memory to %s MB",
+  self._total_memory)
+  return False
+except Exception as e:
+  logger.warning(
+  "Error parsing nvidia-smi output: %s. "
+  "Defaulting total memory to %s MB",
+  e,
+  self._total_memory)
+  return False
+
+  def start(self):
+self._gpu_available = self._detect_hardware()
+if self._running or not self._gpu_available:
+  return
+self._running = True
+self._thread = threading.Thread(target=self._poll_loop, daemon=True)
+self._thread.start()
+
+  def stop(self):
+self._running = False
+if self._thread:
+  self._thread.join()
+
+  def reset_peak(self):
+with self._lock:
+  now = time.time()
+  self._memory_history.clear()
+  self._memory_history.append((now, self._current_usage))
+  self._peak_usage = self._current_usage
+
+  def get_stats(self) -> Tuple[float, float, float]:
+with self._lock:
+  return self._current_usage, self._peak_usage, self._total_memory
+
+  def refresh(self):
+"""Forces an immediate poll of the GPU."""
+usage = self._get_nvidia_smi_used()
+now = time.time()
+with self._lock:
+  self._current_usage = usage
+  self._memory_history.append((now, usage))
+  # Recalculate peak immediately
+  while self._memory_history and (now - self._memory_history[0][0]
+  > self._peak_window_seconds):
+self._memory_history.popleft()
+  self._peak_usage = (
+  max(m for _, m in self._memory_history)
+  if self._memory_history else usage)
+
+  def _get_nvidia_smi_used(self) -> float:
+try:
+  cmd = "nvidia-smi --query-gpu=memory.free --format=csv,noheader,nounits"
+  output = subproce

Re: [PR] Add model manager that automatically manage model across processes [beam]

2026-01-30 Thread via GitHub


AMOOOMA commented on code in PR #37113:
URL: https://github.com/apache/beam/pull/37113#discussion_r2747544211


##
sdks/python/apache_beam/ml/inference/model_manager.py:
##
@@ -0,0 +1,721 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Module for managing ML models in Apache Beam pipelines.
+
+This module provides classes and functions to efficiently manage multiple
+machine learning models within Apache Beam pipelines. It includes functionality
+for loading, caching, and updating models using multi-process shared memory,
+ensuring that models are reused across different workers to optimize resource
+usage and performance.
+"""
+
+import time
+import threading
+import subprocess
+import logging
+import gc
+import numpy as np
+from scipy.optimize import nnls
+import torch
+import heapq
+import itertools
+from collections import defaultdict, deque, Counter, OrderedDict
+from typing import Dict, Any, Tuple, Optional, Callable
+from apache_beam.utils import multi_process_shared
+
+logger = logging.getLogger(__name__)
+
+
+class GPUMonitor:
+  """Monitors GPU memory usage in a separate thread using nvidia-smi.
+
+  This class continuously polls GPU memory statistics to track current usage
+  and peak usage over a sliding time window. It serves as the source of truth
+  for the ModelManager's resource decisions.
+
+  Attributes:
+fallback_memory_mb: Default total memory if hardware detection fails.
+poll_interval: Seconds between memory checks.
+peak_window_seconds: Duration to track peak memory usage.
+  """
+  def __init__(
+  self,
+  fallback_memory_mb: float = 16000.0,
+  poll_interval: float = 0.5,
+  peak_window_seconds: float = 30.0):
+self._current_usage = 0.0
+self._peak_usage = 0.0
+self._total_memory = fallback_memory_mb
+self._poll_interval = poll_interval
+self._peak_window_seconds = peak_window_seconds
+self._memory_history = deque()
+self._running = False
+self._thread = None
+self._lock = threading.Lock()
+
+  def _detect_hardware(self):
+try:
+  cmd = [
+  "nvidia-smi",
+  "--query-gpu=memory.total",
+  "--format=csv,noheader,nounits"
+  ]
+  output = subprocess.check_output(cmd, text=True).strip()
+  self._total_memory = float(output)
+  return True
+except (FileNotFoundError, subprocess.CalledProcessError):
+  logger.warning(
+  "nvidia-smi not found or failed. Defaulting total memory to %s MB",
+  self._total_memory)
+  return False
+except Exception as e:
+  logger.warning(
+  "Error parsing nvidia-smi output: %s. "
+  "Defaulting total memory to %s MB",
+  e,
+  self._total_memory)
+  return False
+
+  def start(self):
+self._gpu_available = self._detect_hardware()
+if self._running or not self._gpu_available:
+  return
+self._running = True
+self._thread = threading.Thread(target=self._poll_loop, daemon=True)
+self._thread.start()
+
+  def stop(self):
+self._running = False
+if self._thread:
+  self._thread.join()
+
+  def reset_peak(self):
+with self._lock:
+  now = time.time()
+  self._memory_history.clear()
+  self._memory_history.append((now, self._current_usage))
+  self._peak_usage = self._current_usage
+
+  def get_stats(self) -> Tuple[float, float, float]:
+with self._lock:
+  return self._current_usage, self._peak_usage, self._total_memory
+
+  def refresh(self):
+"""Forces an immediate poll of the GPU."""
+usage = self._get_nvidia_smi_used()
+now = time.time()
+with self._lock:
+  self._current_usage = usage
+  self._memory_history.append((now, usage))
+  # Recalculate peak immediately
+  while self._memory_history and (now - self._memory_history[0][0]
+  > self._peak_window_seconds):
+self._memory_history.popleft()
+  self._peak_usage = (
+  max(m for _, m in self._memory_history)
+  if self._memory_history else usage)
+
+  def _get_nvidia_smi_used(self) -> float:
+try:
+  cmd = "nvidia-smi --query-gpu=memory.free --format=csv,noheader,nounits"
+  output = subproce

Re: [PR] Add model manager that automatically manage model across processes [beam]

2026-01-30 Thread via GitHub


AMOOOMA commented on code in PR #37113:
URL: https://github.com/apache/beam/pull/37113#discussion_r2747533941


##
sdks/python/apache_beam/ml/inference/model_manager.py:
##
@@ -0,0 +1,721 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Module for managing ML models in Apache Beam pipelines.
+
+This module provides classes and functions to efficiently manage multiple
+machine learning models within Apache Beam pipelines. It includes functionality
+for loading, caching, and updating models using multi-process shared memory,
+ensuring that models are reused across different workers to optimize resource
+usage and performance.
+"""
+
+import time
+import threading
+import subprocess
+import logging
+import gc
+import numpy as np
+from scipy.optimize import nnls
+import torch
+import heapq
+import itertools
+from collections import defaultdict, deque, Counter, OrderedDict
+from typing import Dict, Any, Tuple, Optional, Callable
+from apache_beam.utils import multi_process_shared
+
+logger = logging.getLogger(__name__)
+
+
+class GPUMonitor:
+  """Monitors GPU memory usage in a separate thread using nvidia-smi.
+
+  This class continuously polls GPU memory statistics to track current usage
+  and peak usage over a sliding time window. It serves as the source of truth
+  for the ModelManager's resource decisions.
+
+  Attributes:
+fallback_memory_mb: Default total memory if hardware detection fails.
+poll_interval: Seconds between memory checks.
+peak_window_seconds: Duration to track peak memory usage.
+  """
+  def __init__(
+  self,
+  fallback_memory_mb: float = 16000.0,
+  poll_interval: float = 0.5,
+  peak_window_seconds: float = 30.0):
+self._current_usage = 0.0
+self._peak_usage = 0.0
+self._total_memory = fallback_memory_mb
+self._poll_interval = poll_interval
+self._peak_window_seconds = peak_window_seconds
+self._memory_history = deque()
+self._running = False
+self._thread = None
+self._lock = threading.Lock()
+
+  def _detect_hardware(self):
+try:
+  cmd = [
+  "nvidia-smi",
+  "--query-gpu=memory.total",
+  "--format=csv,noheader,nounits"
+  ]
+  output = subprocess.check_output(cmd, text=True).strip()
+  self._total_memory = float(output)
+  return True
+except (FileNotFoundError, subprocess.CalledProcessError):
+  logger.warning(
+  "nvidia-smi not found or failed. Defaulting total memory to %s MB",
+  self._total_memory)
+  return False
+except Exception as e:
+  logger.warning(
+  "Error parsing nvidia-smi output: %s. "
+  "Defaulting total memory to %s MB",
+  e,
+  self._total_memory)
+  return False
+
+  def start(self):
+self._gpu_available = self._detect_hardware()
+if self._running or not self._gpu_available:
+  return
+self._running = True
+self._thread = threading.Thread(target=self._poll_loop, daemon=True)
+self._thread.start()
+
+  def stop(self):
+self._running = False
+if self._thread:
+  self._thread.join()
+
+  def reset_peak(self):
+with self._lock:
+  now = time.time()
+  self._memory_history.clear()
+  self._memory_history.append((now, self._current_usage))
+  self._peak_usage = self._current_usage
+
+  def get_stats(self) -> Tuple[float, float, float]:
+with self._lock:
+  return self._current_usage, self._peak_usage, self._total_memory
+
+  def refresh(self):
+"""Forces an immediate poll of the GPU."""
+usage = self._get_nvidia_smi_used()
+now = time.time()
+with self._lock:
+  self._current_usage = usage
+  self._memory_history.append((now, usage))
+  # Recalculate peak immediately
+  while self._memory_history and (now - self._memory_history[0][0]
+  > self._peak_window_seconds):
+self._memory_history.popleft()
+  self._peak_usage = (
+  max(m for _, m in self._memory_history)
+  if self._memory_history else usage)
+
+  def _get_nvidia_smi_used(self) -> float:
+try:
+  cmd = "nvidia-smi --query-gpu=memory.free --format=csv,noheader,nounits"
+  output = subproce

Re: [PR] Add model manager that automatically manage model across processes [beam]

2026-01-30 Thread via GitHub


AMOOOMA commented on code in PR #37113:
URL: https://github.com/apache/beam/pull/37113#discussion_r2747528037


##
sdks/python/apache_beam/ml/inference/model_manager.py:
##
@@ -0,0 +1,721 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Module for managing ML models in Apache Beam pipelines.
+
+This module provides classes and functions to efficiently manage multiple
+machine learning models within Apache Beam pipelines. It includes functionality
+for loading, caching, and updating models using multi-process shared memory,
+ensuring that models are reused across different workers to optimize resource
+usage and performance.
+"""
+
+import time
+import threading
+import subprocess
+import logging
+import gc
+import numpy as np
+from scipy.optimize import nnls
+import torch
+import heapq
+import itertools
+from collections import defaultdict, deque, Counter, OrderedDict
+from typing import Dict, Any, Tuple, Optional, Callable
+from apache_beam.utils import multi_process_shared
+
+logger = logging.getLogger(__name__)
+
+
+class GPUMonitor:
+  """Monitors GPU memory usage in a separate thread using nvidia-smi.
+
+  This class continuously polls GPU memory statistics to track current usage
+  and peak usage over a sliding time window. It serves as the source of truth
+  for the ModelManager's resource decisions.
+
+  Attributes:
+fallback_memory_mb: Default total memory if hardware detection fails.
+poll_interval: Seconds between memory checks.
+peak_window_seconds: Duration to track peak memory usage.
+  """
+  def __init__(
+  self,
+  fallback_memory_mb: float = 16000.0,
+  poll_interval: float = 0.5,
+  peak_window_seconds: float = 30.0):
+self._current_usage = 0.0
+self._peak_usage = 0.0
+self._total_memory = fallback_memory_mb
+self._poll_interval = poll_interval
+self._peak_window_seconds = peak_window_seconds
+self._memory_history = deque()
+self._running = False
+self._thread = None
+self._lock = threading.Lock()
+
+  def _detect_hardware(self):
+try:
+  cmd = [
+  "nvidia-smi",
+  "--query-gpu=memory.total",
+  "--format=csv,noheader,nounits"
+  ]
+  output = subprocess.check_output(cmd, text=True).strip()
+  self._total_memory = float(output)
+  return True
+except (FileNotFoundError, subprocess.CalledProcessError):
+  logger.warning(
+  "nvidia-smi not found or failed. Defaulting total memory to %s MB",
+  self._total_memory)
+  return False
+except Exception as e:
+  logger.warning(
+  "Error parsing nvidia-smi output: %s. "
+  "Defaulting total memory to %s MB",
+  e,
+  self._total_memory)
+  return False
+
+  def start(self):
+self._gpu_available = self._detect_hardware()
+if self._running or not self._gpu_available:
+  return
+self._running = True
+self._thread = threading.Thread(target=self._poll_loop, daemon=True)
+self._thread.start()
+
+  def stop(self):
+self._running = False
+if self._thread:
+  self._thread.join()
+
+  def reset_peak(self):
+with self._lock:
+  now = time.time()
+  self._memory_history.clear()
+  self._memory_history.append((now, self._current_usage))
+  self._peak_usage = self._current_usage
+
+  def get_stats(self) -> Tuple[float, float, float]:
+with self._lock:
+  return self._current_usage, self._peak_usage, self._total_memory
+
+  def refresh(self):
+"""Forces an immediate poll of the GPU."""
+usage = self._get_nvidia_smi_used()
+now = time.time()
+with self._lock:
+  self._current_usage = usage
+  self._memory_history.append((now, usage))
+  # Recalculate peak immediately
+  while self._memory_history and (now - self._memory_history[0][0]
+  > self._peak_window_seconds):
+self._memory_history.popleft()
+  self._peak_usage = (
+  max(m for _, m in self._memory_history)
+  if self._memory_history else usage)
+
+  def _get_nvidia_smi_used(self) -> float:
+try:
+  cmd = "nvidia-smi --query-gpu=memory.free --format=csv,noheader,nounits"
+  output = subproce

Re: [PR] Add model manager that automatically manage model across processes [beam]

2026-01-30 Thread via GitHub


gemini-code-assist[bot] commented on code in PR #37113:
URL: https://github.com/apache/beam/pull/37113#discussion_r2747477968


##
sdks/python/apache_beam/ml/inference/model_manager.py:
##
@@ -0,0 +1,730 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Module for managing ML models in Apache Beam pipelines.
+
+This module provides classes and functions to efficiently manage multiple
+machine learning models within Apache Beam pipelines. It includes functionality
+for loading, caching, and updating models using multi-process shared memory,
+ensuring that models are reused across different workers to optimize resource
+usage and performance.
+"""
+
+import gc
+import heapq
+import itertools
+import logging
+import subprocess
+import threading
+import time
+from collections import Counter
+from collections import OrderedDict
+from collections import defaultdict
+from collections import deque
+from typing import Any
+from typing import Callable
+from typing import Dict
+from typing import Optional
+from typing import Tuple
+
+import numpy as np
+import torch
+from scipy.optimize import nnls
+
+from apache_beam.utils import multi_process_shared
+
+logger = logging.getLogger(__name__)
+
+
+class GPUMonitor:
+  """Monitors GPU memory usage in a separate thread using nvidia-smi.
+
+  This class continuously polls GPU memory statistics to track current usage
+  and peak usage over a sliding time window. It serves as the source of truth
+  for the ModelManager's resource decisions.
+
+  Attributes:
+fallback_memory_mb: Default total memory if hardware detection fails.
+poll_interval: Seconds between memory checks.
+peak_window_seconds: Duration to track peak memory usage.
+  """
+  def __init__(
+  self,
+  fallback_memory_mb: float = 16000.0,
+  poll_interval: float = 0.5,
+  peak_window_seconds: float = 30.0):
+self._current_usage = 0.0
+self._peak_usage = 0.0
+self._total_memory = fallback_memory_mb
+self._poll_interval = poll_interval
+self._peak_window_seconds = peak_window_seconds
+self._memory_history = deque()
+self._running = False
+self._thread = None
+self._lock = threading.Lock()
+
+  def _detect_hardware(self):
+try:
+  cmd = [
+  "nvidia-smi",
+  "--query-gpu=memory.total",
+  "--format=csv,noheader,nounits"
+  ]
+  output = subprocess.check_output(cmd, text=True).strip()
+  self._total_memory = float(output)
+  return True
+except (FileNotFoundError, subprocess.CalledProcessError):
+  logger.warning(
+  "nvidia-smi not found or failed. Defaulting total memory to %s MB",
+  self._total_memory)
+  return False
+except Exception as e:
+  logger.warning(
+  "Error parsing nvidia-smi output: %s. "
+  "Defaulting total memory to %s MB",
+  e,
+  self._total_memory)
+  return False
+
+  def start(self):
+self._gpu_available = self._detect_hardware()
+if self._running or not self._gpu_available:
+  return
+self._running = True
+self._thread = threading.Thread(target=self._poll_loop, daemon=True)
+self._thread.start()
+
+  def stop(self):
+self._running = False
+if self._thread:
+  self._thread.join()
+
+  def reset_peak(self):
+with self._lock:
+  now = time.time()
+  self._memory_history.clear()
+  self._memory_history.append((now, self._current_usage))
+  self._peak_usage = self._current_usage
+
+  def get_stats(self) -> Tuple[float, float, float]:
+with self._lock:
+  return self._current_usage, self._peak_usage, self._total_memory
+
+  def refresh(self):
+"""Forces an immediate poll of the GPU."""
+usage = self._get_nvidia_smi_used()
+now = time.time()
+with self._lock:
+  self._current_usage = usage
+  self._memory_history.append((now, usage))
+  # Recalculate peak immediately
+  while self._memory_history and (now - self._memory_history[0][0]
+  > self._peak_window_seconds):
+self._memory_history.popleft()
+  self._peak_usage = (
+  max(m for _, m in self._memory_history)
+  if self._memory_history else

Re: [PR] Add model manager that automatically manage model across processes [beam]

2026-01-30 Thread via GitHub


damccorm commented on PR #37113:
URL: https://github.com/apache/beam/pull/37113#issuecomment-3825073867

   /gemini review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Add model manager that automatically manage model across processes [beam]

2026-01-26 Thread via GitHub


codecov[bot] commented on PR #37113:
URL: https://github.com/apache/beam/pull/37113#issuecomment-3802905586

   ## 
[Codecov](https://app.codecov.io/gh/apache/beam/pull/37113?dropdown=coverage&src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 Report
   :x: Patch coverage is `2.09790%` with `420 lines` in your changes missing 
coverage. Please review.
   :white_check_mark: Project coverage is 39.97%. Comparing base 
([`3ce2abd`](https://app.codecov.io/gh/apache/beam/commit/3ce2abdc505d09569b668d8071f89306ea9aeb81?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache))
 to head 
([`8a856b0`](https://app.codecov.io/gh/apache/beam/commit/8a856b0f57e4907b238d1081e0ead09c7ef464ca?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)).
   :warning: Report is 233 commits behind head on master.
   
   | [Files with missing 
lines](https://app.codecov.io/gh/apache/beam/pull/37113?dropdown=coverage&src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | Patch % | Lines |
   |---|---|---|
   | 
[...s/python/apache\_beam/ml/inference/model\_manager.py](https://app.codecov.io/gh/apache/beam/pull/37113?src=pr&el=tree&filepath=sdks%2Fpython%2Fapache_beam%2Fml%2Finference%2Fmodel_manager.py&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vbWwvaW5mZXJlbmNlL21vZGVsX21hbmFnZXIucHk=)
 | 2.09% | [420 Missing :warning: 
](https://app.codecov.io/gh/apache/beam/pull/37113?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 |
   
   Additional details and impacted files
   
   
   
   ```diff
   @@ Coverage Diff  @@
   ## master   #37113  +/-   ##
   
   - Coverage 40.38%   39.97%   -0.42% 
 Complexity 3476 3476  
   
 Files  1226 1222   -4 
 Lines188339   187813 -526 
 Branches   3607 3607  
   
   - Hits  7605875074 -984 
   - Misses   108883   109341 +458 
 Partials   3398 3398  
   ```
   
   | 
[Flag](https://app.codecov.io/gh/apache/beam/pull/37113/flags?src=pr&el=flags&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | Coverage Δ | |
   |---|---|---|
   | 
[python](https://app.codecov.io/gh/apache/beam/pull/37113/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | `39.66% <2.09%> (-1.00%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click 
here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment)
 to find out more.
   
   
   [:umbrella: View full report in Codecov by 
Sentry](https://app.codecov.io/gh/apache/beam/pull/37113?dropdown=coverage&src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).
   
   :loudspeaker: Have feedback on the report? [Share it 
here](https://about.codecov.io/codecov-pr-comment-feedback/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).
:rocket: New features to boost your workflow: 
   
   - :snowflake: [Test 
Analytics](https://docs.codecov.com/docs/test-analytics): Detect flaky tests, 
report on failures, and find test suite problems.
   - :package: [JS Bundle 
Analysis](https://docs.codecov.com/docs/javascript-bundle-analysis): Save 
yourself from yourself by tracking and limiting bundle sizes in JS merges.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Add model manager that automatically manage model across processes [beam]

2026-01-26 Thread via GitHub


AMOOOMA commented on code in PR #37113:
URL: https://github.com/apache/beam/pull/37113#discussion_r2730080557


##
sdks/python/apache_beam/ml/inference/model_manager.py:
##
@@ -0,0 +1,704 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Module for managing ML models in Apache Beam pipelines.
+
+This module provides classes and functions to efficiently manage multiple
+machine learning models within Apache Beam pipelines. It includes functionality
+for loading, caching, and updating models using multi-process shared memory,
+ensuring that models are reused across different workers to optimize resource
+usage and performance.
+"""
+
+import uuid
+import time
+import threading
+import subprocess
+import logging
+import gc
+import numpy as np
+from scipy.optimize import nnls
+import torch
+import heapq
+import itertools
+from collections import defaultdict, deque, Counter, OrderedDict
+from typing import Dict, Any, Tuple, Optional, Callable
+
+logger = logging.getLogger(__name__)
+
+
+class GPUMonitor:
+  """Monitors GPU memory usage in a separate thread using nvidia-smi.
+
+  This class continuously polls GPU memory statistics to track current usage
+  and peak usage over a sliding time window. It serves as the source of truth
+  for the ModelManager's resource decisions.
+
+  Attributes:
+fallback_memory_mb: Default total memory if hardware detection fails.
+poll_interval: Seconds between memory checks.
+peak_window_seconds: Duration to track peak memory usage.
+  """
+  def __init__(
+  self,
+  fallback_memory_mb: float = 16000.0,
+  poll_interval: float = 0.5,
+  peak_window_seconds: float = 30.0):
+self._current_usage = 0.0
+self._peak_usage = 0.0
+self._total_memory = fallback_memory_mb
+self._poll_interval = poll_interval
+self._peak_window_seconds = peak_window_seconds
+self._memory_history = deque()
+self._running = False
+self._thread = None
+self._lock = threading.Lock()
+
+  def _detect_hardware(self):
+try:
+  cmd = [
+  "nvidia-smi",
+  "--query-gpu=memory.total",
+  "--format=csv,noheader,nounits"
+  ]
+  output = subprocess.check_output(cmd, text=True).strip()
+  self._total_memory = float(output)
+  return True
+except (FileNotFoundError, subprocess.CalledProcessError):
+  logger.warning(
+  "nvidia-smi not found or failed. Defaulting total memory to %s MB",
+  self._total_memory)
+  return False
+except Exception as e:
+  logger.warning(
+  "Error parsing nvidia-smi output: %s. "
+  "Defaulting total memory to %s MB",
+  e,
+  self._total_memory)
+  return False
+
+  def start(self):
+self._gpu_available = self._detect_hardware()
+if self._running or not self._gpu_available:
+  return
+self._running = True
+self._thread = threading.Thread(target=self._poll_loop, daemon=True)
+self._thread.start()
+
+  def stop(self):
+self._running = False
+if self._thread:
+  self._thread.join()
+
+  def reset_peak(self):
+with self._lock:
+  now = time.time()
+  self._memory_history.clear()
+  self._memory_history.append((now, self._current_usage))
+  self._peak_usage = self._current_usage
+
+  def get_stats(self) -> Tuple[float, float, float]:
+with self._lock:
+  return self._current_usage, self._peak_usage, self._total_memory
+
+  def refresh(self):
+"""Forces an immediate poll of the GPU."""
+usage = self._get_nvidia_smi_used()
+now = time.time()
+with self._lock:
+  self._current_usage = usage
+  self._memory_history.append((now, usage))
+  # Recalculate peak immediately
+  while self._memory_history and (now - self._memory_history[0][0]
+  > self._peak_window_seconds):
+self._memory_history.popleft()
+  self._peak_usage = (
+  max(m for _, m in self._memory_history)
+  if self._memory_history else usage)
+
+  def _get_nvidia_smi_used(self) -> float:
+try:
+  cmd = "nvidia-smi --query-gpu=memory.free --format=csv,noheader,nounits"
+  output = subprocess.check_output(cmd, shell=True).decode

Re: [PR] Add model manager that automatically manage model across processes [beam]

2026-01-26 Thread via GitHub


AMOOOMA commented on code in PR #37113:
URL: https://github.com/apache/beam/pull/37113#discussion_r2730085433


##
sdks/python/apache_beam/ml/inference/model_manager.py:
##
@@ -0,0 +1,704 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Module for managing ML models in Apache Beam pipelines.
+
+This module provides classes and functions to efficiently manage multiple
+machine learning models within Apache Beam pipelines. It includes functionality
+for loading, caching, and updating models using multi-process shared memory,
+ensuring that models are reused across different workers to optimize resource
+usage and performance.
+"""
+
+import uuid
+import time
+import threading
+import subprocess
+import logging
+import gc
+import numpy as np
+from scipy.optimize import nnls
+import torch
+import heapq
+import itertools
+from collections import defaultdict, deque, Counter, OrderedDict
+from typing import Dict, Any, Tuple, Optional, Callable
+
+logger = logging.getLogger(__name__)
+
+
+class GPUMonitor:
+  """Monitors GPU memory usage in a separate thread using nvidia-smi.
+
+  This class continuously polls GPU memory statistics to track current usage
+  and peak usage over a sliding time window. It serves as the source of truth
+  for the ModelManager's resource decisions.
+
+  Attributes:
+fallback_memory_mb: Default total memory if hardware detection fails.
+poll_interval: Seconds between memory checks.
+peak_window_seconds: Duration to track peak memory usage.
+  """
+  def __init__(
+  self,
+  fallback_memory_mb: float = 16000.0,
+  poll_interval: float = 0.5,
+  peak_window_seconds: float = 30.0):
+self._current_usage = 0.0
+self._peak_usage = 0.0
+self._total_memory = fallback_memory_mb
+self._poll_interval = poll_interval
+self._peak_window_seconds = peak_window_seconds
+self._memory_history = deque()
+self._running = False
+self._thread = None
+self._lock = threading.Lock()
+
+  def _detect_hardware(self):
+try:
+  cmd = [
+  "nvidia-smi",
+  "--query-gpu=memory.total",
+  "--format=csv,noheader,nounits"
+  ]
+  output = subprocess.check_output(cmd, text=True).strip()
+  self._total_memory = float(output)
+  return True
+except (FileNotFoundError, subprocess.CalledProcessError):
+  logger.warning(
+  "nvidia-smi not found or failed. Defaulting total memory to %s MB",
+  self._total_memory)
+  return False
+except Exception as e:
+  logger.warning(
+  "Error parsing nvidia-smi output: %s. "
+  "Defaulting total memory to %s MB",
+  e,
+  self._total_memory)
+  return False
+
+  def start(self):
+self._gpu_available = self._detect_hardware()
+if self._running or not self._gpu_available:
+  return
+self._running = True
+self._thread = threading.Thread(target=self._poll_loop, daemon=True)
+self._thread.start()
+
+  def stop(self):
+self._running = False
+if self._thread:
+  self._thread.join()
+
+  def reset_peak(self):
+with self._lock:
+  now = time.time()
+  self._memory_history.clear()
+  self._memory_history.append((now, self._current_usage))
+  self._peak_usage = self._current_usage
+
+  def get_stats(self) -> Tuple[float, float, float]:
+with self._lock:
+  return self._current_usage, self._peak_usage, self._total_memory
+
+  def refresh(self):
+"""Forces an immediate poll of the GPU."""
+usage = self._get_nvidia_smi_used()
+now = time.time()
+with self._lock:
+  self._current_usage = usage
+  self._memory_history.append((now, usage))
+  # Recalculate peak immediately
+  while self._memory_history and (now - self._memory_history[0][0]
+  > self._peak_window_seconds):
+self._memory_history.popleft()
+  self._peak_usage = (
+  max(m for _, m in self._memory_history)
+  if self._memory_history else usage)
+
+  def _get_nvidia_smi_used(self) -> float:
+try:
+  cmd = "nvidia-smi --query-gpu=memory.free --format=csv,noheader,nounits"
+  output = subprocess.check_output(cmd, shell=True).decode

Re: [PR] Add model manager that automatically manage model across processes [beam]

2026-01-26 Thread via GitHub


AMOOOMA commented on code in PR #37113:
URL: https://github.com/apache/beam/pull/37113#discussion_r2730080960


##
sdks/python/apache_beam/ml/inference/model_manager.py:
##
@@ -0,0 +1,704 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Module for managing ML models in Apache Beam pipelines.
+
+This module provides classes and functions to efficiently manage multiple
+machine learning models within Apache Beam pipelines. It includes functionality
+for loading, caching, and updating models using multi-process shared memory,
+ensuring that models are reused across different workers to optimize resource
+usage and performance.
+"""
+
+import uuid
+import time
+import threading
+import subprocess
+import logging
+import gc
+import numpy as np
+from scipy.optimize import nnls
+import torch
+import heapq
+import itertools
+from collections import defaultdict, deque, Counter, OrderedDict
+from typing import Dict, Any, Tuple, Optional, Callable
+
+logger = logging.getLogger(__name__)
+
+
+class GPUMonitor:
+  """Monitors GPU memory usage in a separate thread using nvidia-smi.
+
+  This class continuously polls GPU memory statistics to track current usage
+  and peak usage over a sliding time window. It serves as the source of truth
+  for the ModelManager's resource decisions.
+
+  Attributes:
+fallback_memory_mb: Default total memory if hardware detection fails.
+poll_interval: Seconds between memory checks.
+peak_window_seconds: Duration to track peak memory usage.
+  """
+  def __init__(
+  self,
+  fallback_memory_mb: float = 16000.0,
+  poll_interval: float = 0.5,
+  peak_window_seconds: float = 30.0):
+self._current_usage = 0.0
+self._peak_usage = 0.0
+self._total_memory = fallback_memory_mb
+self._poll_interval = poll_interval
+self._peak_window_seconds = peak_window_seconds
+self._memory_history = deque()
+self._running = False
+self._thread = None
+self._lock = threading.Lock()
+
+  def _detect_hardware(self):
+try:
+  cmd = [
+  "nvidia-smi",
+  "--query-gpu=memory.total",
+  "--format=csv,noheader,nounits"
+  ]
+  output = subprocess.check_output(cmd, text=True).strip()
+  self._total_memory = float(output)
+  return True
+except (FileNotFoundError, subprocess.CalledProcessError):
+  logger.warning(
+  "nvidia-smi not found or failed. Defaulting total memory to %s MB",
+  self._total_memory)
+  return False
+except Exception as e:
+  logger.warning(
+  "Error parsing nvidia-smi output: %s. "
+  "Defaulting total memory to %s MB",
+  e,
+  self._total_memory)
+  return False
+
+  def start(self):
+self._gpu_available = self._detect_hardware()
+if self._running or not self._gpu_available:
+  return
+self._running = True
+self._thread = threading.Thread(target=self._poll_loop, daemon=True)
+self._thread.start()
+
+  def stop(self):
+self._running = False
+if self._thread:
+  self._thread.join()
+
+  def reset_peak(self):
+with self._lock:
+  now = time.time()
+  self._memory_history.clear()
+  self._memory_history.append((now, self._current_usage))
+  self._peak_usage = self._current_usage
+
+  def get_stats(self) -> Tuple[float, float, float]:
+with self._lock:
+  return self._current_usage, self._peak_usage, self._total_memory
+
+  def refresh(self):
+"""Forces an immediate poll of the GPU."""
+usage = self._get_nvidia_smi_used()
+now = time.time()
+with self._lock:
+  self._current_usage = usage
+  self._memory_history.append((now, usage))
+  # Recalculate peak immediately
+  while self._memory_history and (now - self._memory_history[0][0]
+  > self._peak_window_seconds):
+self._memory_history.popleft()
+  self._peak_usage = (
+  max(m for _, m in self._memory_history)
+  if self._memory_history else usage)
+
+  def _get_nvidia_smi_used(self) -> float:
+try:
+  cmd = "nvidia-smi --query-gpu=memory.free --format=csv,noheader,nounits"
+  output = subprocess.check_output(cmd, shell=True).decode

Re: [PR] Add model manager that automatically manage model across processes [beam]

2026-01-26 Thread via GitHub


AMOOOMA commented on code in PR #37113:
URL: https://github.com/apache/beam/pull/37113#discussion_r2730076360


##
sdks/python/apache_beam/ml/inference/model_manager.py:
##
@@ -0,0 +1,704 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Module for managing ML models in Apache Beam pipelines.
+
+This module provides classes and functions to efficiently manage multiple
+machine learning models within Apache Beam pipelines. It includes functionality
+for loading, caching, and updating models using multi-process shared memory,
+ensuring that models are reused across different workers to optimize resource
+usage and performance.
+"""
+
+import uuid
+import time
+import threading
+import subprocess
+import logging
+import gc
+import numpy as np
+from scipy.optimize import nnls
+import torch
+import heapq
+import itertools
+from collections import defaultdict, deque, Counter, OrderedDict
+from typing import Dict, Any, Tuple, Optional, Callable
+
+logger = logging.getLogger(__name__)
+
+
+class GPUMonitor:
+  """Monitors GPU memory usage in a separate thread using nvidia-smi.
+
+  This class continuously polls GPU memory statistics to track current usage
+  and peak usage over a sliding time window. It serves as the source of truth
+  for the ModelManager's resource decisions.
+
+  Attributes:
+fallback_memory_mb: Default total memory if hardware detection fails.
+poll_interval: Seconds between memory checks.
+peak_window_seconds: Duration to track peak memory usage.
+  """
+  def __init__(
+  self,
+  fallback_memory_mb: float = 16000.0,
+  poll_interval: float = 0.5,
+  peak_window_seconds: float = 30.0):
+self._current_usage = 0.0
+self._peak_usage = 0.0
+self._total_memory = fallback_memory_mb
+self._poll_interval = poll_interval
+self._peak_window_seconds = peak_window_seconds
+self._memory_history = deque()
+self._running = False
+self._thread = None
+self._lock = threading.Lock()
+
+  def _detect_hardware(self):
+try:
+  cmd = [
+  "nvidia-smi",
+  "--query-gpu=memory.total",
+  "--format=csv,noheader,nounits"
+  ]
+  output = subprocess.check_output(cmd, text=True).strip()
+  self._total_memory = float(output)
+  return True
+except (FileNotFoundError, subprocess.CalledProcessError):
+  logger.warning(
+  "nvidia-smi not found or failed. Defaulting total memory to %s MB",
+  self._total_memory)
+  return False
+except Exception as e:
+  logger.warning(
+  "Error parsing nvidia-smi output: %s. "
+  "Defaulting total memory to %s MB",
+  e,
+  self._total_memory)
+  return False
+
+  def start(self):
+self._gpu_available = self._detect_hardware()
+if self._running or not self._gpu_available:
+  return
+self._running = True
+self._thread = threading.Thread(target=self._poll_loop, daemon=True)
+self._thread.start()
+
+  def stop(self):
+self._running = False
+if self._thread:
+  self._thread.join()
+
+  def reset_peak(self):
+with self._lock:
+  now = time.time()
+  self._memory_history.clear()
+  self._memory_history.append((now, self._current_usage))
+  self._peak_usage = self._current_usage
+
+  def get_stats(self) -> Tuple[float, float, float]:
+with self._lock:
+  return self._current_usage, self._peak_usage, self._total_memory
+
+  def refresh(self):
+"""Forces an immediate poll of the GPU."""
+usage = self._get_nvidia_smi_used()
+now = time.time()
+with self._lock:
+  self._current_usage = usage
+  self._memory_history.append((now, usage))
+  # Recalculate peak immediately
+  while self._memory_history and (now - self._memory_history[0][0]
+  > self._peak_window_seconds):
+self._memory_history.popleft()
+  self._peak_usage = (
+  max(m for _, m in self._memory_history)
+  if self._memory_history else usage)
+
+  def _get_nvidia_smi_used(self) -> float:
+try:
+  cmd = "nvidia-smi --query-gpu=memory.free --format=csv,noheader,nounits"
+  output = subprocess.check_output(cmd, shell=True).decode

Re: [PR] Add model manager that automatically manage model across processes [beam]

2026-01-26 Thread via GitHub


AMOOOMA commented on code in PR #37113:
URL: https://github.com/apache/beam/pull/37113#discussion_r2730070881


##
sdks/python/apache_beam/ml/inference/model_manager.py:
##
@@ -0,0 +1,704 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Module for managing ML models in Apache Beam pipelines.
+
+This module provides classes and functions to efficiently manage multiple
+machine learning models within Apache Beam pipelines. It includes functionality
+for loading, caching, and updating models using multi-process shared memory,
+ensuring that models are reused across different workers to optimize resource
+usage and performance.
+"""
+
+import uuid
+import time
+import threading
+import subprocess
+import logging
+import gc
+import numpy as np
+from scipy.optimize import nnls
+import torch
+import heapq
+import itertools
+from collections import defaultdict, deque, Counter, OrderedDict
+from typing import Dict, Any, Tuple, Optional, Callable
+
+logger = logging.getLogger(__name__)
+
+
+class GPUMonitor:
+  """Monitors GPU memory usage in a separate thread using nvidia-smi.
+
+  This class continuously polls GPU memory statistics to track current usage
+  and peak usage over a sliding time window. It serves as the source of truth
+  for the ModelManager's resource decisions.
+
+  Attributes:
+fallback_memory_mb: Default total memory if hardware detection fails.
+poll_interval: Seconds between memory checks.
+peak_window_seconds: Duration to track peak memory usage.
+  """
+  def __init__(
+  self,
+  fallback_memory_mb: float = 16000.0,
+  poll_interval: float = 0.5,
+  peak_window_seconds: float = 30.0):
+self._current_usage = 0.0
+self._peak_usage = 0.0
+self._total_memory = fallback_memory_mb
+self._poll_interval = poll_interval
+self._peak_window_seconds = peak_window_seconds
+self._memory_history = deque()
+self._running = False
+self._thread = None
+self._lock = threading.Lock()
+
+  def _detect_hardware(self):
+try:
+  cmd = [
+  "nvidia-smi",
+  "--query-gpu=memory.total",
+  "--format=csv,noheader,nounits"
+  ]
+  output = subprocess.check_output(cmd, text=True).strip()
+  self._total_memory = float(output)
+  return True
+except (FileNotFoundError, subprocess.CalledProcessError):
+  logger.warning(
+  "nvidia-smi not found or failed. Defaulting total memory to %s MB",
+  self._total_memory)
+  return False
+except Exception as e:
+  logger.warning(
+  "Error parsing nvidia-smi output: %s. "
+  "Defaulting total memory to %s MB",
+  e,
+  self._total_memory)
+  return False
+
+  def start(self):
+self._gpu_available = self._detect_hardware()
+if self._running or not self._gpu_available:
+  return
+self._running = True
+self._thread = threading.Thread(target=self._poll_loop, daemon=True)
+self._thread.start()
+
+  def stop(self):
+self._running = False
+if self._thread:
+  self._thread.join()
+
+  def reset_peak(self):
+with self._lock:
+  now = time.time()
+  self._memory_history.clear()
+  self._memory_history.append((now, self._current_usage))
+  self._peak_usage = self._current_usage
+
+  def get_stats(self) -> Tuple[float, float, float]:
+with self._lock:
+  return self._current_usage, self._peak_usage, self._total_memory
+
+  def refresh(self):
+"""Forces an immediate poll of the GPU."""
+usage = self._get_nvidia_smi_used()
+now = time.time()
+with self._lock:
+  self._current_usage = usage
+  self._memory_history.append((now, usage))
+  # Recalculate peak immediately
+  while self._memory_history and (now - self._memory_history[0][0]
+  > self._peak_window_seconds):
+self._memory_history.popleft()
+  self._peak_usage = (
+  max(m for _, m in self._memory_history)
+  if self._memory_history else usage)
+
+  def _get_nvidia_smi_used(self) -> float:
+try:
+  cmd = "nvidia-smi --query-gpu=memory.free --format=csv,noheader,nounits"
+  output = subprocess.check_output(cmd, shell=True).decode

Re: [PR] Add model manager that automatically manage model across processes [beam]

2026-01-26 Thread via GitHub


AMOOOMA commented on code in PR #37113:
URL: https://github.com/apache/beam/pull/37113#discussion_r2730069151


##
sdks/python/apache_beam/ml/inference/model_manager.py:
##
@@ -0,0 +1,704 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Module for managing ML models in Apache Beam pipelines.
+
+This module provides classes and functions to efficiently manage multiple
+machine learning models within Apache Beam pipelines. It includes functionality
+for loading, caching, and updating models using multi-process shared memory,
+ensuring that models are reused across different workers to optimize resource
+usage and performance.
+"""
+
+import uuid
+import time
+import threading
+import subprocess
+import logging
+import gc
+import numpy as np
+from scipy.optimize import nnls
+import torch
+import heapq
+import itertools
+from collections import defaultdict, deque, Counter, OrderedDict
+from typing import Dict, Any, Tuple, Optional, Callable
+
+logger = logging.getLogger(__name__)
+
+
+class GPUMonitor:
+  """Monitors GPU memory usage in a separate thread using nvidia-smi.
+
+  This class continuously polls GPU memory statistics to track current usage
+  and peak usage over a sliding time window. It serves as the source of truth
+  for the ModelManager's resource decisions.
+
+  Attributes:
+fallback_memory_mb: Default total memory if hardware detection fails.
+poll_interval: Seconds between memory checks.
+peak_window_seconds: Duration to track peak memory usage.
+  """
+  def __init__(
+  self,
+  fallback_memory_mb: float = 16000.0,
+  poll_interval: float = 0.5,
+  peak_window_seconds: float = 30.0):
+self._current_usage = 0.0
+self._peak_usage = 0.0
+self._total_memory = fallback_memory_mb
+self._poll_interval = poll_interval
+self._peak_window_seconds = peak_window_seconds
+self._memory_history = deque()
+self._running = False
+self._thread = None
+self._lock = threading.Lock()
+
+  def _detect_hardware(self):
+try:
+  cmd = [
+  "nvidia-smi",
+  "--query-gpu=memory.total",
+  "--format=csv,noheader,nounits"
+  ]
+  output = subprocess.check_output(cmd, text=True).strip()
+  self._total_memory = float(output)
+  return True
+except (FileNotFoundError, subprocess.CalledProcessError):
+  logger.warning(
+  "nvidia-smi not found or failed. Defaulting total memory to %s MB",
+  self._total_memory)
+  return False
+except Exception as e:
+  logger.warning(
+  "Error parsing nvidia-smi output: %s. "
+  "Defaulting total memory to %s MB",
+  e,
+  self._total_memory)
+  return False
+
+  def start(self):
+self._gpu_available = self._detect_hardware()
+if self._running or not self._gpu_available:
+  return
+self._running = True
+self._thread = threading.Thread(target=self._poll_loop, daemon=True)
+self._thread.start()
+
+  def stop(self):
+self._running = False
+if self._thread:
+  self._thread.join()
+
+  def reset_peak(self):
+with self._lock:
+  now = time.time()
+  self._memory_history.clear()
+  self._memory_history.append((now, self._current_usage))
+  self._peak_usage = self._current_usage
+
+  def get_stats(self) -> Tuple[float, float, float]:
+with self._lock:
+  return self._current_usage, self._peak_usage, self._total_memory
+
+  def refresh(self):
+"""Forces an immediate poll of the GPU."""
+usage = self._get_nvidia_smi_used()
+now = time.time()
+with self._lock:
+  self._current_usage = usage
+  self._memory_history.append((now, usage))
+  # Recalculate peak immediately
+  while self._memory_history and (now - self._memory_history[0][0]
+  > self._peak_window_seconds):
+self._memory_history.popleft()
+  self._peak_usage = (
+  max(m for _, m in self._memory_history)
+  if self._memory_history else usage)
+
+  def _get_nvidia_smi_used(self) -> float:
+try:
+  cmd = "nvidia-smi --query-gpu=memory.free --format=csv,noheader,nounits"
+  output = subprocess.check_output(cmd, shell=True).decode

Re: [PR] Add model manager that automatically manage model across processes [beam]

2026-01-26 Thread via GitHub


AMOOOMA commented on code in PR #37113:
URL: https://github.com/apache/beam/pull/37113#discussion_r2730067529


##
sdks/python/apache_beam/ml/inference/model_manager.py:
##
@@ -0,0 +1,704 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Module for managing ML models in Apache Beam pipelines.
+
+This module provides classes and functions to efficiently manage multiple
+machine learning models within Apache Beam pipelines. It includes functionality
+for loading, caching, and updating models using multi-process shared memory,
+ensuring that models are reused across different workers to optimize resource
+usage and performance.
+"""
+
+import uuid
+import time
+import threading
+import subprocess
+import logging
+import gc
+import numpy as np
+from scipy.optimize import nnls
+import torch
+import heapq
+import itertools
+from collections import defaultdict, deque, Counter, OrderedDict
+from typing import Dict, Any, Tuple, Optional, Callable
+
+logger = logging.getLogger(__name__)
+
+
+class GPUMonitor:
+  """Monitors GPU memory usage in a separate thread using nvidia-smi.
+
+  This class continuously polls GPU memory statistics to track current usage
+  and peak usage over a sliding time window. It serves as the source of truth
+  for the ModelManager's resource decisions.
+
+  Attributes:
+fallback_memory_mb: Default total memory if hardware detection fails.
+poll_interval: Seconds between memory checks.
+peak_window_seconds: Duration to track peak memory usage.
+  """
+  def __init__(
+  self,
+  fallback_memory_mb: float = 16000.0,
+  poll_interval: float = 0.5,
+  peak_window_seconds: float = 30.0):
+self._current_usage = 0.0
+self._peak_usage = 0.0
+self._total_memory = fallback_memory_mb
+self._poll_interval = poll_interval
+self._peak_window_seconds = peak_window_seconds
+self._memory_history = deque()
+self._running = False
+self._thread = None
+self._lock = threading.Lock()
+
+  def _detect_hardware(self):
+try:
+  cmd = [
+  "nvidia-smi",
+  "--query-gpu=memory.total",
+  "--format=csv,noheader,nounits"
+  ]
+  output = subprocess.check_output(cmd, text=True).strip()
+  self._total_memory = float(output)
+  return True
+except (FileNotFoundError, subprocess.CalledProcessError):
+  logger.warning(
+  "nvidia-smi not found or failed. Defaulting total memory to %s MB",
+  self._total_memory)
+  return False
+except Exception as e:
+  logger.warning(
+  "Error parsing nvidia-smi output: %s. "
+  "Defaulting total memory to %s MB",
+  e,
+  self._total_memory)
+  return False
+
+  def start(self):
+self._gpu_available = self._detect_hardware()
+if self._running or not self._gpu_available:
+  return
+self._running = True
+self._thread = threading.Thread(target=self._poll_loop, daemon=True)
+self._thread.start()
+
+  def stop(self):
+self._running = False
+if self._thread:
+  self._thread.join()
+
+  def reset_peak(self):
+with self._lock:
+  now = time.time()
+  self._memory_history.clear()
+  self._memory_history.append((now, self._current_usage))
+  self._peak_usage = self._current_usage
+
+  def get_stats(self) -> Tuple[float, float, float]:
+with self._lock:
+  return self._current_usage, self._peak_usage, self._total_memory
+
+  def refresh(self):
+"""Forces an immediate poll of the GPU."""
+usage = self._get_nvidia_smi_used()
+now = time.time()
+with self._lock:
+  self._current_usage = usage
+  self._memory_history.append((now, usage))
+  # Recalculate peak immediately
+  while self._memory_history and (now - self._memory_history[0][0]
+  > self._peak_window_seconds):
+self._memory_history.popleft()
+  self._peak_usage = (
+  max(m for _, m in self._memory_history)
+  if self._memory_history else usage)
+
+  def _get_nvidia_smi_used(self) -> float:
+try:
+  cmd = "nvidia-smi --query-gpu=memory.free --format=csv,noheader,nounits"
+  output = subprocess.check_output(cmd, shell=True).decode

Re: [PR] Add model manager that automatically manage model across processes [beam]

2026-01-26 Thread via GitHub


AMOOOMA commented on code in PR #37113:
URL: https://github.com/apache/beam/pull/37113#discussion_r2730067239


##
sdks/python/apache_beam/ml/inference/model_manager.py:
##
@@ -0,0 +1,669 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Module for managing ML models in Apache Beam pipelines.
+
+This module provides classes and functions to efficiently manage multiple
+machine learning models within Apache Beam pipelines. It includes functionality
+for loading, caching, and updating models using multi-process shared memory,
+ensuring that models are reused across different workers to optimize resource
+usage and performance.
+"""
+
+import uuid
+import time
+import threading
+import subprocess
+import logging
+import gc
+import numpy as np
+from scipy.optimize import nnls
+import torch
+import heapq
+import itertools
+from collections import defaultdict, deque, Counter, OrderedDict
+from typing import Dict, Any, Tuple, Optional, Callable
+
+logger = logging.getLogger(__name__)
+
+
+class GPUMonitor:
+  def __init__(
+  self,
+  fallback_memory_mb: float = 16000.0,
+  poll_interval: float = 0.5,
+  peak_window_seconds: float = 30.0):
+self._current_usage = 0.0
+self._peak_usage = 0.0
+self._total_memory = fallback_memory_mb
+self._poll_interval = poll_interval
+self._peak_window_seconds = peak_window_seconds
+self._memory_history = deque()
+self._running = False
+self._thread = None
+self._lock = threading.Lock()
+self._gpu_available = self._detect_hardware()
+
+  def _detect_hardware(self):
+try:
+  cmd = [
+  "nvidia-smi",
+  "--query-gpu=memory.total",
+  "--format=csv,noheader,nounits"
+  ]
+  output = subprocess.check_output(cmd, text=True).strip()
+  self._total_memory = float(output)
+  return True
+except (FileNotFoundError, subprocess.CalledProcessError):
+  logger.warning(
+  "nvidia-smi not found or failed. Defaulting total memory to %s MB",
+  self._total_memory)
+  return False
+except Exception as e:
+  logger.warning(
+  "Error parsing nvidia-smi output: %s. "
+  "Defaulting total memory to %s MB",
+  e,
+  self._total_memory)
+  return False
+
+  def start(self):
+if self._running or not self._gpu_available:
+  return
+self._running = True
+self._thread = threading.Thread(target=self._poll_loop, daemon=True)
+self._thread.start()
+
+  def stop(self):
+self._running = False
+if self._thread:
+  self._thread.join()
+
+  def reset_peak(self):
+with self._lock:
+  now = time.time()
+  self._memory_history.clear()
+  self._memory_history.append((now, self._current_usage))
+  self._peak_usage = self._current_usage
+
+  def get_stats(self) -> Tuple[float, float, float]:
+with self._lock:
+  return self._current_usage, self._peak_usage, self._total_memory
+
+  def refresh(self):
+"""Forces an immediate poll of the GPU."""
+usage = self._get_nvidia_smi_used()
+now = time.time()
+with self._lock:
+  self._current_usage = usage
+  self._memory_history.append((now, usage))
+  # Recalculate peak immediately
+  while self._memory_history and (now - self._memory_history[0][0]
+  > self._peak_window_seconds):
+self._memory_history.popleft()
+  self._peak_usage = (
+  max(m for _, m in self._memory_history)
+  if self._memory_history else usage)
+
+  def _get_nvidia_smi_used(self) -> float:
+try:
+  cmd = "nvidia-smi --query-gpu=memory.free --format=csv,noheader,nounits"
+  output = subprocess.check_output(cmd, shell=True).decode("utf-8").strip()
+  free_memory = float(output)
+  return self._total_memory - free_memory
+except Exception:
+  return 0.0
+
+  def _poll_loop(self):
+while self._running:
+  usage = self._get_nvidia_smi_used()
+  now = time.time()
+  with self._lock:
+self._current_usage = usage
+self._memory_history.append((now, usage))
+while self._memory_history and (now - self._memory_history[0][0]
+

Re: [PR] Add model manager that automatically manage model across processes [beam]

2026-01-26 Thread via GitHub


damccorm commented on code in PR #37113:
URL: https://github.com/apache/beam/pull/37113#discussion_r2729477496


##
sdks/python/apache_beam/ml/inference/model_manager.py:
##
@@ -0,0 +1,704 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Module for managing ML models in Apache Beam pipelines.
+
+This module provides classes and functions to efficiently manage multiple
+machine learning models within Apache Beam pipelines. It includes functionality
+for loading, caching, and updating models using multi-process shared memory,
+ensuring that models are reused across different workers to optimize resource
+usage and performance.
+"""
+
+import uuid
+import time
+import threading
+import subprocess
+import logging
+import gc
+import numpy as np
+from scipy.optimize import nnls
+import torch
+import heapq
+import itertools
+from collections import defaultdict, deque, Counter, OrderedDict
+from typing import Dict, Any, Tuple, Optional, Callable
+
+logger = logging.getLogger(__name__)
+
+
+class GPUMonitor:
+  """Monitors GPU memory usage in a separate thread using nvidia-smi.
+
+  This class continuously polls GPU memory statistics to track current usage
+  and peak usage over a sliding time window. It serves as the source of truth
+  for the ModelManager's resource decisions.
+
+  Attributes:
+fallback_memory_mb: Default total memory if hardware detection fails.
+poll_interval: Seconds between memory checks.
+peak_window_seconds: Duration to track peak memory usage.
+  """
+  def __init__(
+  self,
+  fallback_memory_mb: float = 16000.0,
+  poll_interval: float = 0.5,
+  peak_window_seconds: float = 30.0):
+self._current_usage = 0.0
+self._peak_usage = 0.0
+self._total_memory = fallback_memory_mb
+self._poll_interval = poll_interval
+self._peak_window_seconds = peak_window_seconds
+self._memory_history = deque()
+self._running = False
+self._thread = None
+self._lock = threading.Lock()
+
+  def _detect_hardware(self):
+try:
+  cmd = [
+  "nvidia-smi",
+  "--query-gpu=memory.total",
+  "--format=csv,noheader,nounits"
+  ]
+  output = subprocess.check_output(cmd, text=True).strip()
+  self._total_memory = float(output)
+  return True
+except (FileNotFoundError, subprocess.CalledProcessError):
+  logger.warning(
+  "nvidia-smi not found or failed. Defaulting total memory to %s MB",
+  self._total_memory)
+  return False
+except Exception as e:
+  logger.warning(
+  "Error parsing nvidia-smi output: %s. "
+  "Defaulting total memory to %s MB",
+  e,
+  self._total_memory)
+  return False
+
+  def start(self):
+self._gpu_available = self._detect_hardware()
+if self._running or not self._gpu_available:
+  return
+self._running = True
+self._thread = threading.Thread(target=self._poll_loop, daemon=True)
+self._thread.start()
+
+  def stop(self):
+self._running = False
+if self._thread:
+  self._thread.join()
+
+  def reset_peak(self):
+with self._lock:
+  now = time.time()
+  self._memory_history.clear()
+  self._memory_history.append((now, self._current_usage))
+  self._peak_usage = self._current_usage
+
+  def get_stats(self) -> Tuple[float, float, float]:
+with self._lock:
+  return self._current_usage, self._peak_usage, self._total_memory
+
+  def refresh(self):
+"""Forces an immediate poll of the GPU."""
+usage = self._get_nvidia_smi_used()
+now = time.time()
+with self._lock:
+  self._current_usage = usage
+  self._memory_history.append((now, usage))
+  # Recalculate peak immediately
+  while self._memory_history and (now - self._memory_history[0][0]
+  > self._peak_window_seconds):
+self._memory_history.popleft()
+  self._peak_usage = (
+  max(m for _, m in self._memory_history)
+  if self._memory_history else usage)
+
+  def _get_nvidia_smi_used(self) -> float:
+try:
+  cmd = "nvidia-smi --query-gpu=memory.free --format=csv,noheader,nounits"
+  output = subprocess.check_output(cmd, shell=True).decod

Re: [PR] Add model manager that automatically manage model across processes [beam]

2026-01-23 Thread via GitHub


AMOOOMA commented on code in PR #37113:
URL: https://github.com/apache/beam/pull/37113#discussion_r2723093557


##
sdks/python/apache_beam/ml/inference/model_manager.py:
##
@@ -0,0 +1,669 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Module for managing ML models in Apache Beam pipelines.
+
+This module provides classes and functions to efficiently manage multiple
+machine learning models within Apache Beam pipelines. It includes functionality
+for loading, caching, and updating models using multi-process shared memory,
+ensuring that models are reused across different workers to optimize resource
+usage and performance.
+"""
+
+import uuid
+import time
+import threading
+import subprocess
+import logging
+import gc
+import numpy as np
+from scipy.optimize import nnls
+import torch
+import heapq
+import itertools
+from collections import defaultdict, deque, Counter, OrderedDict
+from typing import Dict, Any, Tuple, Optional, Callable
+
+logger = logging.getLogger(__name__)
+
+
+class GPUMonitor:
+  def __init__(
+  self,
+  fallback_memory_mb: float = 16000.0,
+  poll_interval: float = 0.5,
+  peak_window_seconds: float = 30.0):
+self._current_usage = 0.0
+self._peak_usage = 0.0
+self._total_memory = fallback_memory_mb
+self._poll_interval = poll_interval
+self._peak_window_seconds = peak_window_seconds
+self._memory_history = deque()
+self._running = False
+self._thread = None
+self._lock = threading.Lock()
+self._gpu_available = self._detect_hardware()
+
+  def _detect_hardware(self):
+try:
+  cmd = [
+  "nvidia-smi",
+  "--query-gpu=memory.total",
+  "--format=csv,noheader,nounits"
+  ]
+  output = subprocess.check_output(cmd, text=True).strip()
+  self._total_memory = float(output)
+  return True
+except (FileNotFoundError, subprocess.CalledProcessError):
+  logger.warning(
+  "nvidia-smi not found or failed. Defaulting total memory to %s MB",
+  self._total_memory)
+  return False
+except Exception as e:
+  logger.warning(
+  "Error parsing nvidia-smi output: %s. "
+  "Defaulting total memory to %s MB",
+  e,
+  self._total_memory)
+  return False
+
+  def start(self):
+if self._running or not self._gpu_available:
+  return
+self._running = True
+self._thread = threading.Thread(target=self._poll_loop, daemon=True)
+self._thread.start()
+
+  def stop(self):
+self._running = False
+if self._thread:
+  self._thread.join()
+
+  def reset_peak(self):
+with self._lock:
+  now = time.time()
+  self._memory_history.clear()
+  self._memory_history.append((now, self._current_usage))
+  self._peak_usage = self._current_usage
+
+  def get_stats(self) -> Tuple[float, float, float]:
+with self._lock:
+  return self._current_usage, self._peak_usage, self._total_memory
+
+  def refresh(self):
+"""Forces an immediate poll of the GPU."""
+usage = self._get_nvidia_smi_used()
+now = time.time()
+with self._lock:
+  self._current_usage = usage
+  self._memory_history.append((now, usage))
+  # Recalculate peak immediately
+  while self._memory_history and (now - self._memory_history[0][0]
+  > self._peak_window_seconds):
+self._memory_history.popleft()
+  self._peak_usage = (
+  max(m for _, m in self._memory_history)
+  if self._memory_history else usage)
+
+  def _get_nvidia_smi_used(self) -> float:
+try:
+  cmd = "nvidia-smi --query-gpu=memory.free --format=csv,noheader,nounits"
+  output = subprocess.check_output(cmd, shell=True).decode("utf-8").strip()
+  free_memory = float(output)
+  return self._total_memory - free_memory
+except Exception:
+  return 0.0
+
+  def _poll_loop(self):
+while self._running:
+  usage = self._get_nvidia_smi_used()
+  now = time.time()
+  with self._lock:
+self._current_usage = usage
+self._memory_history.append((now, usage))
+while self._memory_history and (now - self._memory_history[0][0]
+

Re: [PR] Add model manager that automatically manage model across processes [beam]

2026-01-23 Thread via GitHub


AMOOOMA commented on code in PR #37113:
URL: https://github.com/apache/beam/pull/37113#discussion_r2723092867


##
sdks/python/apache_beam/ml/inference/model_manager.py:
##
@@ -0,0 +1,669 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Module for managing ML models in Apache Beam pipelines.
+
+This module provides classes and functions to efficiently manage multiple
+machine learning models within Apache Beam pipelines. It includes functionality
+for loading, caching, and updating models using multi-process shared memory,
+ensuring that models are reused across different workers to optimize resource
+usage and performance.
+"""
+
+import uuid
+import time
+import threading
+import subprocess
+import logging
+import gc
+import numpy as np
+from scipy.optimize import nnls
+import torch
+import heapq
+import itertools
+from collections import defaultdict, deque, Counter, OrderedDict
+from typing import Dict, Any, Tuple, Optional, Callable
+
+logger = logging.getLogger(__name__)
+
+
+class GPUMonitor:
+  def __init__(
+  self,
+  fallback_memory_mb: float = 16000.0,
+  poll_interval: float = 0.5,
+  peak_window_seconds: float = 30.0):
+self._current_usage = 0.0
+self._peak_usage = 0.0
+self._total_memory = fallback_memory_mb
+self._poll_interval = poll_interval
+self._peak_window_seconds = peak_window_seconds
+self._memory_history = deque()
+self._running = False
+self._thread = None
+self._lock = threading.Lock()
+self._gpu_available = self._detect_hardware()

Review Comment:
   Done.



##
sdks/python/apache_beam/ml/inference/model_manager.py:
##
@@ -0,0 +1,669 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Module for managing ML models in Apache Beam pipelines.
+
+This module provides classes and functions to efficiently manage multiple
+machine learning models within Apache Beam pipelines. It includes functionality
+for loading, caching, and updating models using multi-process shared memory,
+ensuring that models are reused across different workers to optimize resource
+usage and performance.
+"""
+
+import uuid
+import time
+import threading
+import subprocess
+import logging
+import gc
+import numpy as np
+from scipy.optimize import nnls
+import torch
+import heapq
+import itertools
+from collections import defaultdict, deque, Counter, OrderedDict
+from typing import Dict, Any, Tuple, Optional, Callable
+
+logger = logging.getLogger(__name__)
+
+
+class GPUMonitor:
+  def __init__(
+  self,
+  fallback_memory_mb: float = 16000.0,
+  poll_interval: float = 0.5,
+  peak_window_seconds: float = 30.0):
+self._current_usage = 0.0
+self._peak_usage = 0.0
+self._total_memory = fallback_memory_mb
+self._poll_interval = poll_interval
+self._peak_window_seconds = peak_window_seconds
+self._memory_history = deque()
+self._running = False
+self._thread = None
+self._lock = threading.Lock()
+self._gpu_available = self._detect_hardware()
+
+  def _detect_hardware(self):
+try:
+  cmd = [
+  "nvidia-smi",
+  "--query-gpu=memory.total",
+  "--format=csv,noheader,nounits"
+  ]
+  output = subprocess.check_output(cmd, text=True).strip()
+  self._total_memory = float(output)
+  return True
+except (FileNotFoundError, subprocess.CalledProcessError):
+  logger.warning(
+  "nvidia-smi not found or failed. Defaulting total memor

Re: [PR] Add model manager that automatically manage model across processes [beam]

2026-01-13 Thread via GitHub


damccorm commented on code in PR #37113:
URL: https://github.com/apache/beam/pull/37113#discussion_r2687842957


##
sdks/python/apache_beam/ml/inference/model_manager.py:
##
@@ -0,0 +1,669 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Module for managing ML models in Apache Beam pipelines.
+
+This module provides classes and functions to efficiently manage multiple
+machine learning models within Apache Beam pipelines. It includes functionality
+for loading, caching, and updating models using multi-process shared memory,
+ensuring that models are reused across different workers to optimize resource
+usage and performance.
+"""
+
+import uuid
+import time
+import threading
+import subprocess
+import logging
+import gc
+import numpy as np
+from scipy.optimize import nnls
+import torch
+import heapq
+import itertools
+from collections import defaultdict, deque, Counter, OrderedDict
+from typing import Dict, Any, Tuple, Optional, Callable
+
+logger = logging.getLogger(__name__)
+
+
+class GPUMonitor:
+  def __init__(
+  self,
+  fallback_memory_mb: float = 16000.0,
+  poll_interval: float = 0.5,
+  peak_window_seconds: float = 30.0):
+self._current_usage = 0.0
+self._peak_usage = 0.0
+self._total_memory = fallback_memory_mb
+self._poll_interval = poll_interval
+self._peak_window_seconds = peak_window_seconds
+self._memory_history = deque()
+self._running = False
+self._thread = None
+self._lock = threading.Lock()
+self._gpu_available = self._detect_hardware()

Review Comment:
   I think it would be better to defer this logic to the first time we call 
`start`. That will ensure that if this object gets created at job submission, 
we still look in the execution environment for the GPU



##
sdks/python/apache_beam/ml/inference/model_manager.py:
##
@@ -0,0 +1,669 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Module for managing ML models in Apache Beam pipelines.
+
+This module provides classes and functions to efficiently manage multiple
+machine learning models within Apache Beam pipelines. It includes functionality
+for loading, caching, and updating models using multi-process shared memory,
+ensuring that models are reused across different workers to optimize resource
+usage and performance.
+"""
+
+import uuid
+import time
+import threading
+import subprocess
+import logging
+import gc
+import numpy as np
+from scipy.optimize import nnls
+import torch
+import heapq
+import itertools
+from collections import defaultdict, deque, Counter, OrderedDict
+from typing import Dict, Any, Tuple, Optional, Callable
+
+logger = logging.getLogger(__name__)
+
+
+class GPUMonitor:
+  def __init__(
+  self,
+  fallback_memory_mb: float = 16000.0,
+  poll_interval: float = 0.5,
+  peak_window_seconds: float = 30.0):
+self._current_usage = 0.0
+self._peak_usage = 0.0
+self._total_memory = fallback_memory_mb
+self._poll_interval = poll_interval
+self._peak_window_seconds = peak_window_seconds
+self._memory_history = deque()
+self._running = False
+self._thread = None
+self._lock = threading.Lock()
+self._gpu_available = self._detect_hardware()
+
+  def _detect_hardware(self):
+try:
+  cmd = [
+  "nvidia-smi",
+  "--query-gpu=memory.total",
+  "--format=csv,noheader,nounits"
+  ]
+  output = subprocess.check_output(cmd, text=True).strip()
+  self

Re: [PR] Add model manager that automatically manage model across processes [beam]

2025-12-15 Thread via GitHub


github-actions[bot] commented on PR #37113:
URL: https://github.com/apache/beam/pull/37113#issuecomment-3657783339

   Stopping reviewer notifications for this pull request: review requested by 
someone other than the bot, ceding control. If you'd like to restart, comment 
`assign set of reviewers`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Add model manager that automatically manage model across processes [beam]

2025-12-15 Thread via GitHub


AMOOOMA commented on PR #37113:
URL: https://github.com/apache/beam/pull/37113#issuecomment-3657768250

   R: @damccorm


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Add model manager that automatically manage model across processes [beam]

2025-12-15 Thread via GitHub


gemini-code-assist[bot] commented on PR #37113:
URL: https://github.com/apache/beam/pull/37113#issuecomment-3657768014

   ## Summary of Changes
   
   Hello @AMOOOMA, I'm Gemini Code Assist[^1]! I'm currently reviewing this 
pull request and will post my feedback shortly. In the meantime, here's a 
summary to help you and other reviewers quickly get up to speed!
   
   This pull request introduces a sophisticated model management system for 
Apache Beam's ML inference capabilities. The core "ModelManager" class, 
supported by "GPUMonitor" and "ResourceEstimator", intelligently handles the 
lifecycle of machine learning models, particularly on GPU-accelerated 
environments. It aims to prevent out-of-memory errors by dynamically estimating 
model memory requirements, isolating unknown models for profiling, and 
implementing a demand-aware eviction strategy. This system ensures efficient 
and concurrent execution of diverse ML models within Beam pipelines, optimizing 
GPU resource utilization and improving overall stability.
   
   ### Highlights
   
   * **ModelManager Introduction**: A new "ModelManager" class is added to 
provide managed access to ML models, handling GPU OOMs and optimizing resource 
usage.
   * **GPUMonitor for Memory Tracking**: Implements "GPUMonitor" to 
continuously poll and track GPU memory usage, including current, peak, and 
total memory.
   * **ResourceEstimator for Cost Estimation**: Introduces "ResourceEstimator" 
which uses Non-Negative Least Squares (NNLS) to dynamically estimate the memory 
cost of models and adapt to fluctuating usage.
   * **Intelligent Model Eviction**: The "ModelManager" includes an eviction 
strategy that prioritizes models based on demand, age, and surplus copies to 
free up GPU memory when needed.
   * **Isolation Mode for Unknown Models**: Unknown models are loaded in an 
isolated environment to accurately profile their memory footprint without 
affecting other active models.
   * **Comprehensive Testing**: New unit tests are added to validate the 
functionality of "GPUMonitor", "ResourceEstimator", and "ModelManager", 
covering capacity checks, isolation, concurrent execution, OOM recovery, and 
eviction logic.
   
   
   
   
   
   
   
   
   Using Gemini Code Assist
   
   
   The full guide for Gemini Code Assist can be found on our [documentation 
page](https://developers.google.com/gemini-code-assist/docs/review-github-code),
 here are some quick tips.
   
   Invoking Gemini
   
   You can request assistance from Gemini at any point by creating a comment 
using either `/gemini ` or `@gemini-code-assist `. Below is a 
summary of the supported commands on the current page.
   
   Feature | Command | Description
   --- | --- | ---
   Code Review | `/gemini review` | Performs a code review for the current pull 
request in its current state.
   Pull Request Summary | `/gemini summary` | Provides a summary of the current 
pull request in its current state.
   Comment | @gemini-code-assist | Responds in comments when explicitly tagged, 
both in pull request comments and review comments.
   Help | `/gemini help` | Displays a list of available commands.
   
   Customization
   
   To customize Gemini Code Assist for GitHub experience, repository 
maintainers can create a configuration file and/or provide a custom code review 
style guide (such as PEP-8 for Python) by creating and adding files to a 
`.gemini/` folder in the base of the repository. Detailed instructions can be 
found 
[here](https://developers.google.com/gemini-code-assist/docs/customize-gemini-behavior-github).
   
   Limitations & Feedback
   
   Gemini Code Assist may make mistakes. Please leave feedback on any instances 
where its feedback is incorrect or counter productive. You can react with 
:thumbsup: and :thumbsdown: on @gemini-code-assist comments. If you're 
interested in giving your feedback about your experience with Gemini Code 
Assist for Github and other Google products, sign up 
[here](https://google.qualtrics.com/jfe/form/SV_2cyuGuTWsEw84yG).
   
   You can also get AI-powered code generation, chat, as well as code 
reviews directly in the IDE at no cost with the [Gemini Code Assist IDE 
Extension](https://cloud.google.com/products/gemini/code-assist).
   
   
   
   
   
   [^1]: Review the [Privacy Notices](https://policies.google.com/privacy), 
[Generative AI Prohibited Use 
Policy](https://policies.google.com/terms/generative-ai/use-policy), [Terms of 
Service](https://policies.google.com/terms), and learn how to configure Gemini 
Code Assist in GitHub 
[here](https://developers.google.com/gemini-code-assist/docs/customize-gemini-behavior-github).
 Gemini can make mistakes, so double check it and [use code with 
caution](https://support.google.com/legal/answer/13505487).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail