Re: [PR] Add model manager that automatically manage model across processes [beam]
damccorm merged PR #37113: URL: https://github.com/apache/beam/pull/37113 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Add model manager that automatically manage model across processes [beam]
AMOOOMA commented on code in PR #37113: URL: https://github.com/apache/beam/pull/37113#discussion_r2765132216 ## sdks/python/apache_beam/ml/inference/model_manager.py: ## @@ -0,0 +1,747 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Module for managing ML models in Apache Beam pipelines. + +This module provides classes and functions to efficiently manage multiple +machine learning models within Apache Beam pipelines. It includes functionality +for loading, caching, and updating models using multi-process shared memory, +ensuring that models are reused across different workers to optimize resource +usage and performance. +""" + +import gc +import heapq +import itertools +import logging +import subprocess +import threading +import time +from collections import Counter +from collections import OrderedDict +from collections import defaultdict +from collections import deque +from typing import Any +from typing import Callable +from typing import Dict +from typing import Optional +from typing import Tuple + +import numpy as np +import torch +from scipy.optimize import nnls + +from apache_beam.utils import multi_process_shared + +logger = logging.getLogger(__name__) + + +class GPUMonitor: + """Monitors GPU memory usage in a separate thread using nvidia-smi. + + This class continuously polls GPU memory statistics to track current usage + and peak usage over a sliding time window. It serves as the source of truth + for the ModelManager's resource decisions. + + Attributes: +fallback_memory_mb: Default total memory if hardware detection fails. +poll_interval: Seconds between memory checks. +peak_window_seconds: Duration to track peak memory usage. + """ + def __init__( + self, + fallback_memory_mb: float = 16000.0, + poll_interval: float = 0.5, + peak_window_seconds: float = 30.0): +self._current_usage = 0.0 +self._peak_usage = 0.0 +self._total_memory = fallback_memory_mb +self._poll_interval = poll_interval +self._peak_window_seconds = peak_window_seconds +self._memory_history = deque() +self._running = False +self._thread = None +self._lock = threading.Lock() + + def _detect_hardware(self): +try: + cmd = [ + "nvidia-smi", + "--query-gpu=memory.total", + "--format=csv,noheader,nounits" + ] + output = subprocess.check_output(cmd, text=True).strip() + self._total_memory = float(output) + return True +except (FileNotFoundError, subprocess.CalledProcessError): + logger.warning( + "nvidia-smi not found or failed. Defaulting total memory to %s MB", + self._total_memory) + return False +except Exception as e: + logger.warning( + "Error parsing nvidia-smi output: %s. " + "Defaulting total memory to %s MB", + e, + self._total_memory) + return False + + def start(self): +self._gpu_available = self._detect_hardware() +if self._running or not self._gpu_available: + return +self._running = True +self._thread = threading.Thread(target=self._poll_loop, daemon=True) +self._thread.start() + + def stop(self): +self._running = False +if self._thread: + self._thread.join() + + def reset_peak(self): +with self._lock: + now = time.time() + self._memory_history.clear() + self._memory_history.append((now, self._current_usage)) + self._peak_usage = self._current_usage + + def get_stats(self) -> Tuple[float, float, float]: +with self._lock: + return self._current_usage, self._peak_usage, self._total_memory + + def refresh(self): +"""Forces an immediate poll of the GPU.""" +usage = self._get_nvidia_smi_used() +now = time.time() +with self._lock: + self._current_usage = usage + self._memory_history.append((now, usage)) + # Recalculate peak immediately + while self._memory_history and (now - self._memory_history[0][0] + > self._peak_window_seconds): +self._memory_history.popleft() + self._peak_usage = ( + max(m for _, m in self._memory_history) + if self._memory_history else usage) + + def
Re: [PR] Add model manager that automatically manage model across processes [beam]
damccorm commented on code in PR #37113: URL: https://github.com/apache/beam/pull/37113#discussion_r2764995461 ## sdks/python/apache_beam/ml/inference/model_manager.py: ## @@ -0,0 +1,747 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Module for managing ML models in Apache Beam pipelines. + +This module provides classes and functions to efficiently manage multiple +machine learning models within Apache Beam pipelines. It includes functionality +for loading, caching, and updating models using multi-process shared memory, +ensuring that models are reused across different workers to optimize resource +usage and performance. +""" + +import gc +import heapq +import itertools +import logging +import subprocess +import threading +import time +from collections import Counter +from collections import OrderedDict +from collections import defaultdict +from collections import deque +from typing import Any +from typing import Callable +from typing import Dict +from typing import Optional +from typing import Tuple + +import numpy as np +import torch +from scipy.optimize import nnls + +from apache_beam.utils import multi_process_shared + +logger = logging.getLogger(__name__) + + +class GPUMonitor: + """Monitors GPU memory usage in a separate thread using nvidia-smi. + + This class continuously polls GPU memory statistics to track current usage + and peak usage over a sliding time window. It serves as the source of truth + for the ModelManager's resource decisions. + + Attributes: +fallback_memory_mb: Default total memory if hardware detection fails. +poll_interval: Seconds between memory checks. +peak_window_seconds: Duration to track peak memory usage. + """ + def __init__( + self, + fallback_memory_mb: float = 16000.0, + poll_interval: float = 0.5, + peak_window_seconds: float = 30.0): +self._current_usage = 0.0 +self._peak_usage = 0.0 +self._total_memory = fallback_memory_mb +self._poll_interval = poll_interval +self._peak_window_seconds = peak_window_seconds +self._memory_history = deque() +self._running = False +self._thread = None +self._lock = threading.Lock() + + def _detect_hardware(self): +try: + cmd = [ + "nvidia-smi", + "--query-gpu=memory.total", + "--format=csv,noheader,nounits" + ] + output = subprocess.check_output(cmd, text=True).strip() + self._total_memory = float(output) + return True +except (FileNotFoundError, subprocess.CalledProcessError): + logger.warning( + "nvidia-smi not found or failed. Defaulting total memory to %s MB", + self._total_memory) + return False +except Exception as e: + logger.warning( + "Error parsing nvidia-smi output: %s. " + "Defaulting total memory to %s MB", + e, + self._total_memory) + return False + + def start(self): +self._gpu_available = self._detect_hardware() +if self._running or not self._gpu_available: + return +self._running = True +self._thread = threading.Thread(target=self._poll_loop, daemon=True) +self._thread.start() + + def stop(self): +self._running = False +if self._thread: + self._thread.join() + + def reset_peak(self): +with self._lock: + now = time.time() + self._memory_history.clear() + self._memory_history.append((now, self._current_usage)) + self._peak_usage = self._current_usage + + def get_stats(self) -> Tuple[float, float, float]: +with self._lock: + return self._current_usage, self._peak_usage, self._total_memory + + def refresh(self): +"""Forces an immediate poll of the GPU.""" +usage = self._get_nvidia_smi_used() +now = time.time() +with self._lock: + self._current_usage = usage + self._memory_history.append((now, usage)) + # Recalculate peak immediately + while self._memory_history and (now - self._memory_history[0][0] + > self._peak_window_seconds): +self._memory_history.popleft() + self._peak_usage = ( + max(m for _, m in self._memory_history) + if self._memory_history else usage) + + de
Re: [PR] Add model manager that automatically manage model across processes [beam]
AMOOOMA commented on code in PR #37113: URL: https://github.com/apache/beam/pull/37113#discussion_r2761494084 ## sdks/python/apache_beam/ml/inference/model_manager.py: ## @@ -0,0 +1,721 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Module for managing ML models in Apache Beam pipelines. + +This module provides classes and functions to efficiently manage multiple +machine learning models within Apache Beam pipelines. It includes functionality +for loading, caching, and updating models using multi-process shared memory, +ensuring that models are reused across different workers to optimize resource +usage and performance. +""" + +import time +import threading +import subprocess +import logging +import gc +import numpy as np +from scipy.optimize import nnls +import torch +import heapq +import itertools +from collections import defaultdict, deque, Counter, OrderedDict +from typing import Dict, Any, Tuple, Optional, Callable +from apache_beam.utils import multi_process_shared + +logger = logging.getLogger(__name__) + + +class GPUMonitor: + """Monitors GPU memory usage in a separate thread using nvidia-smi. + + This class continuously polls GPU memory statistics to track current usage + and peak usage over a sliding time window. It serves as the source of truth + for the ModelManager's resource decisions. + + Attributes: +fallback_memory_mb: Default total memory if hardware detection fails. +poll_interval: Seconds between memory checks. +peak_window_seconds: Duration to track peak memory usage. + """ + def __init__( + self, + fallback_memory_mb: float = 16000.0, + poll_interval: float = 0.5, + peak_window_seconds: float = 30.0): +self._current_usage = 0.0 +self._peak_usage = 0.0 +self._total_memory = fallback_memory_mb +self._poll_interval = poll_interval +self._peak_window_seconds = peak_window_seconds +self._memory_history = deque() +self._running = False +self._thread = None +self._lock = threading.Lock() + + def _detect_hardware(self): +try: + cmd = [ + "nvidia-smi", + "--query-gpu=memory.total", + "--format=csv,noheader,nounits" + ] + output = subprocess.check_output(cmd, text=True).strip() + self._total_memory = float(output) + return True +except (FileNotFoundError, subprocess.CalledProcessError): + logger.warning( + "nvidia-smi not found or failed. Defaulting total memory to %s MB", + self._total_memory) + return False +except Exception as e: + logger.warning( + "Error parsing nvidia-smi output: %s. " + "Defaulting total memory to %s MB", + e, + self._total_memory) + return False + + def start(self): +self._gpu_available = self._detect_hardware() +if self._running or not self._gpu_available: + return +self._running = True +self._thread = threading.Thread(target=self._poll_loop, daemon=True) +self._thread.start() + + def stop(self): +self._running = False +if self._thread: + self._thread.join() + + def reset_peak(self): +with self._lock: + now = time.time() + self._memory_history.clear() + self._memory_history.append((now, self._current_usage)) + self._peak_usage = self._current_usage + + def get_stats(self) -> Tuple[float, float, float]: +with self._lock: + return self._current_usage, self._peak_usage, self._total_memory + + def refresh(self): +"""Forces an immediate poll of the GPU.""" +usage = self._get_nvidia_smi_used() +now = time.time() +with self._lock: + self._current_usage = usage + self._memory_history.append((now, usage)) + # Recalculate peak immediately + while self._memory_history and (now - self._memory_history[0][0] + > self._peak_window_seconds): +self._memory_history.popleft() + self._peak_usage = ( + max(m for _, m in self._memory_history) + if self._memory_history else usage) + + def _get_nvidia_smi_used(self) -> float: +try: + cmd = "nvidia-smi --query-gpu=memory.free --format=csv,noheader,nounits" + output = subproce
Re: [PR] Add model manager that automatically manage model across processes [beam]
AMOOOMA commented on code in PR #37113: URL: https://github.com/apache/beam/pull/37113#discussion_r2761483860 ## sdks/python/apache_beam/ml/inference/model_manager.py: ## @@ -0,0 +1,721 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Module for managing ML models in Apache Beam pipelines. + +This module provides classes and functions to efficiently manage multiple +machine learning models within Apache Beam pipelines. It includes functionality +for loading, caching, and updating models using multi-process shared memory, +ensuring that models are reused across different workers to optimize resource +usage and performance. +""" + +import time +import threading +import subprocess +import logging +import gc +import numpy as np +from scipy.optimize import nnls +import torch +import heapq +import itertools +from collections import defaultdict, deque, Counter, OrderedDict +from typing import Dict, Any, Tuple, Optional, Callable +from apache_beam.utils import multi_process_shared + +logger = logging.getLogger(__name__) + + +class GPUMonitor: + """Monitors GPU memory usage in a separate thread using nvidia-smi. + + This class continuously polls GPU memory statistics to track current usage + and peak usage over a sliding time window. It serves as the source of truth + for the ModelManager's resource decisions. + + Attributes: +fallback_memory_mb: Default total memory if hardware detection fails. +poll_interval: Seconds between memory checks. +peak_window_seconds: Duration to track peak memory usage. + """ + def __init__( + self, + fallback_memory_mb: float = 16000.0, + poll_interval: float = 0.5, + peak_window_seconds: float = 30.0): +self._current_usage = 0.0 +self._peak_usage = 0.0 +self._total_memory = fallback_memory_mb +self._poll_interval = poll_interval +self._peak_window_seconds = peak_window_seconds +self._memory_history = deque() +self._running = False +self._thread = None +self._lock = threading.Lock() + + def _detect_hardware(self): +try: + cmd = [ + "nvidia-smi", + "--query-gpu=memory.total", + "--format=csv,noheader,nounits" + ] + output = subprocess.check_output(cmd, text=True).strip() + self._total_memory = float(output) + return True +except (FileNotFoundError, subprocess.CalledProcessError): + logger.warning( + "nvidia-smi not found or failed. Defaulting total memory to %s MB", + self._total_memory) + return False +except Exception as e: + logger.warning( + "Error parsing nvidia-smi output: %s. " + "Defaulting total memory to %s MB", + e, + self._total_memory) + return False + + def start(self): +self._gpu_available = self._detect_hardware() +if self._running or not self._gpu_available: + return +self._running = True +self._thread = threading.Thread(target=self._poll_loop, daemon=True) +self._thread.start() + + def stop(self): +self._running = False +if self._thread: + self._thread.join() + + def reset_peak(self): +with self._lock: + now = time.time() + self._memory_history.clear() + self._memory_history.append((now, self._current_usage)) + self._peak_usage = self._current_usage + + def get_stats(self) -> Tuple[float, float, float]: +with self._lock: + return self._current_usage, self._peak_usage, self._total_memory + + def refresh(self): +"""Forces an immediate poll of the GPU.""" +usage = self._get_nvidia_smi_used() +now = time.time() +with self._lock: + self._current_usage = usage + self._memory_history.append((now, usage)) + # Recalculate peak immediately + while self._memory_history and (now - self._memory_history[0][0] + > self._peak_window_seconds): +self._memory_history.popleft() + self._peak_usage = ( + max(m for _, m in self._memory_history) + if self._memory_history else usage) + + def _get_nvidia_smi_used(self) -> float: +try: + cmd = "nvidia-smi --query-gpu=memory.free --format=csv,noheader,nounits" + output = subproce
Re: [PR] Add model manager that automatically manage model across processes [beam]
damccorm commented on code in PR #37113: URL: https://github.com/apache/beam/pull/37113#discussion_r2755802583 ## sdks/python/apache_beam/ml/inference/model_manager.py: ## @@ -0,0 +1,721 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Module for managing ML models in Apache Beam pipelines. + +This module provides classes and functions to efficiently manage multiple +machine learning models within Apache Beam pipelines. It includes functionality +for loading, caching, and updating models using multi-process shared memory, +ensuring that models are reused across different workers to optimize resource +usage and performance. +""" + +import time +import threading +import subprocess +import logging +import gc +import numpy as np +from scipy.optimize import nnls +import torch +import heapq +import itertools +from collections import defaultdict, deque, Counter, OrderedDict +from typing import Dict, Any, Tuple, Optional, Callable +from apache_beam.utils import multi_process_shared + +logger = logging.getLogger(__name__) + + +class GPUMonitor: + """Monitors GPU memory usage in a separate thread using nvidia-smi. + + This class continuously polls GPU memory statistics to track current usage + and peak usage over a sliding time window. It serves as the source of truth + for the ModelManager's resource decisions. + + Attributes: +fallback_memory_mb: Default total memory if hardware detection fails. +poll_interval: Seconds between memory checks. +peak_window_seconds: Duration to track peak memory usage. + """ + def __init__( + self, + fallback_memory_mb: float = 16000.0, + poll_interval: float = 0.5, + peak_window_seconds: float = 30.0): +self._current_usage = 0.0 +self._peak_usage = 0.0 +self._total_memory = fallback_memory_mb +self._poll_interval = poll_interval +self._peak_window_seconds = peak_window_seconds +self._memory_history = deque() +self._running = False +self._thread = None +self._lock = threading.Lock() + + def _detect_hardware(self): +try: + cmd = [ + "nvidia-smi", + "--query-gpu=memory.total", + "--format=csv,noheader,nounits" + ] + output = subprocess.check_output(cmd, text=True).strip() + self._total_memory = float(output) + return True +except (FileNotFoundError, subprocess.CalledProcessError): + logger.warning( + "nvidia-smi not found or failed. Defaulting total memory to %s MB", + self._total_memory) + return False +except Exception as e: + logger.warning( + "Error parsing nvidia-smi output: %s. " + "Defaulting total memory to %s MB", + e, + self._total_memory) + return False + + def start(self): +self._gpu_available = self._detect_hardware() +if self._running or not self._gpu_available: + return +self._running = True +self._thread = threading.Thread(target=self._poll_loop, daemon=True) +self._thread.start() + + def stop(self): +self._running = False +if self._thread: + self._thread.join() + + def reset_peak(self): +with self._lock: + now = time.time() + self._memory_history.clear() + self._memory_history.append((now, self._current_usage)) + self._peak_usage = self._current_usage + + def get_stats(self) -> Tuple[float, float, float]: +with self._lock: + return self._current_usage, self._peak_usage, self._total_memory + + def refresh(self): +"""Forces an immediate poll of the GPU.""" +usage = self._get_nvidia_smi_used() +now = time.time() +with self._lock: + self._current_usage = usage + self._memory_history.append((now, usage)) + # Recalculate peak immediately + while self._memory_history and (now - self._memory_history[0][0] + > self._peak_window_seconds): +self._memory_history.popleft() + self._peak_usage = ( + max(m for _, m in self._memory_history) + if self._memory_history else usage) + + def _get_nvidia_smi_used(self) -> float: +try: + cmd = "nvidia-smi --query-gpu=memory.free --format=csv,noheader,nounits" + output = subproc
Re: [PR] Add model manager that automatically manage model across processes [beam]
damccorm commented on code in PR #37113: URL: https://github.com/apache/beam/pull/37113#discussion_r2755798969 ## sdks/python/apache_beam/ml/inference/model_manager.py: ## @@ -0,0 +1,721 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Module for managing ML models in Apache Beam pipelines. + +This module provides classes and functions to efficiently manage multiple +machine learning models within Apache Beam pipelines. It includes functionality +for loading, caching, and updating models using multi-process shared memory, +ensuring that models are reused across different workers to optimize resource +usage and performance. +""" + +import time +import threading +import subprocess +import logging +import gc +import numpy as np +from scipy.optimize import nnls +import torch +import heapq +import itertools +from collections import defaultdict, deque, Counter, OrderedDict +from typing import Dict, Any, Tuple, Optional, Callable +from apache_beam.utils import multi_process_shared + +logger = logging.getLogger(__name__) + + +class GPUMonitor: + """Monitors GPU memory usage in a separate thread using nvidia-smi. + + This class continuously polls GPU memory statistics to track current usage + and peak usage over a sliding time window. It serves as the source of truth + for the ModelManager's resource decisions. + + Attributes: +fallback_memory_mb: Default total memory if hardware detection fails. +poll_interval: Seconds between memory checks. +peak_window_seconds: Duration to track peak memory usage. + """ + def __init__( + self, + fallback_memory_mb: float = 16000.0, + poll_interval: float = 0.5, + peak_window_seconds: float = 30.0): +self._current_usage = 0.0 +self._peak_usage = 0.0 +self._total_memory = fallback_memory_mb +self._poll_interval = poll_interval +self._peak_window_seconds = peak_window_seconds +self._memory_history = deque() +self._running = False +self._thread = None +self._lock = threading.Lock() + + def _detect_hardware(self): +try: + cmd = [ + "nvidia-smi", + "--query-gpu=memory.total", + "--format=csv,noheader,nounits" + ] + output = subprocess.check_output(cmd, text=True).strip() + self._total_memory = float(output) + return True +except (FileNotFoundError, subprocess.CalledProcessError): + logger.warning( + "nvidia-smi not found or failed. Defaulting total memory to %s MB", + self._total_memory) + return False +except Exception as e: + logger.warning( + "Error parsing nvidia-smi output: %s. " + "Defaulting total memory to %s MB", + e, + self._total_memory) + return False + + def start(self): +self._gpu_available = self._detect_hardware() +if self._running or not self._gpu_available: + return +self._running = True +self._thread = threading.Thread(target=self._poll_loop, daemon=True) +self._thread.start() + + def stop(self): +self._running = False +if self._thread: + self._thread.join() + + def reset_peak(self): +with self._lock: + now = time.time() + self._memory_history.clear() + self._memory_history.append((now, self._current_usage)) + self._peak_usage = self._current_usage + + def get_stats(self) -> Tuple[float, float, float]: +with self._lock: + return self._current_usage, self._peak_usage, self._total_memory + + def refresh(self): +"""Forces an immediate poll of the GPU.""" +usage = self._get_nvidia_smi_used() +now = time.time() +with self._lock: + self._current_usage = usage + self._memory_history.append((now, usage)) + # Recalculate peak immediately + while self._memory_history and (now - self._memory_history[0][0] + > self._peak_window_seconds): +self._memory_history.popleft() + self._peak_usage = ( + max(m for _, m in self._memory_history) + if self._memory_history else usage) + + def _get_nvidia_smi_used(self) -> float: +try: + cmd = "nvidia-smi --query-gpu=memory.free --format=csv,noheader,nounits" + output = subproc
Re: [PR] Add model manager that automatically manage model across processes [beam]
AMOOOMA commented on code in PR #37113:
URL: https://github.com/apache/beam/pull/37113#discussion_r2747636002
##
sdks/python/apache_beam/ml/inference/model_manager_test.py:
##
@@ -0,0 +1,596 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import random
+import threading
+import time
+import unittest
+from concurrent.futures import ThreadPoolExecutor
+from unittest.mock import patch
+
+from apache_beam.utils import multi_process_shared
+
+try:
+ from apache_beam.ml.inference.model_manager import GPUMonitor
+ from apache_beam.ml.inference.model_manager import ModelManager
+ from apache_beam.ml.inference.model_manager import ResourceEstimator
+except ImportError as e:
+ raise unittest.SkipTest("Model Manager dependencies are not installed")
+
+
+class MockGPUMonitor:
+ """
+ Simulates GPU hardware with cumulative memory tracking.
+ Allows simulating specific allocation spikes and baseline usage.
+ """
+ def __init__(self, total_memory=12000.0, peak_window: int = 5):
+self._current = 0.0
+self._peak = 0.0
+self._total = total_memory
+self._lock = threading.Lock()
+self.running = False
+self.history = []
+self.peak_window = peak_window
+
+ def start(self):
+self.running = True
+
+ def stop(self):
+self.running = False
+
+ def get_stats(self):
+with self._lock:
+ return self._current, self._peak, self._total
+
+ def reset_peak(self):
+with self._lock:
+ self._peak = self._current
+ self.history = [self._current]
+
+ def set_usage(self, current_mb):
+"""Sets absolute usage (legacy helper)."""
+with self._lock:
+ self._current = current_mb
+ self._peak = max(self._peak, current_mb)
+
+ def allocate(self, amount_mb):
+"""Simulates memory allocation (e.g., tensors loaded to VRAM)."""
+with self._lock:
+ self._current += amount_mb
+ self.history.append(self._current)
+ if len(self.history) > self.peak_window:
+self.history.pop(0)
+ self._peak = max(self.history)
+
+ def free(self, amount_mb):
+"""Simulates memory freeing (not used often if pooling is active)."""
+with self._lock:
+ self._current = max(0.0, self._current - amount_mb)
+ self.history.append(self._current)
+ if len(self.history) > self.peak_window:
+self.history.pop(0)
+ self._peak = max(self.history)
+
+ def refresh(self):
+"""Simulates a refresh of the monitor stats (no-op for mock)."""
+pass
+
+
+class MockModel:
+ def __init__(self, name, size, monitor):
+self.name = name
+self.size = size
+self.monitor = monitor
+self.deleted = False
+self.monitor.allocate(size)
+
+ def mock_model_unsafe_hard_delete(self):
+if not self.deleted:
+ self.monitor.free(self.size)
+ self.deleted = True
+
+
+class Counter(object):
+ def __init__(self, start=0):
+self.running = start
+self.lock = threading.Lock()
+
+ def get(self):
+return self.running
+
+ def increment(self, value=1):
+with self.lock:
+ self.running += value
+ return self.running
+
+
+class TestModelManager(unittest.TestCase):
+ def setUp(self):
+"""Force reset the Singleton ModelManager before every test."""
+ModelManager._instance = None
Review Comment:
Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Add model manager that automatically manage model across processes [beam]
AMOOOMA commented on code in PR #37113: URL: https://github.com/apache/beam/pull/37113#discussion_r2747632626 ## sdks/python/apache_beam/ml/inference/model_manager.py: ## @@ -0,0 +1,730 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Module for managing ML models in Apache Beam pipelines. + +This module provides classes and functions to efficiently manage multiple +machine learning models within Apache Beam pipelines. It includes functionality +for loading, caching, and updating models using multi-process shared memory, +ensuring that models are reused across different workers to optimize resource +usage and performance. +""" + +import gc +import heapq +import itertools +import logging +import subprocess +import threading +import time +from collections import Counter +from collections import OrderedDict +from collections import defaultdict +from collections import deque +from typing import Any +from typing import Callable +from typing import Dict +from typing import Optional +from typing import Tuple + +import numpy as np +import torch +from scipy.optimize import nnls + +from apache_beam.utils import multi_process_shared + +logger = logging.getLogger(__name__) + + +class GPUMonitor: + """Monitors GPU memory usage in a separate thread using nvidia-smi. + + This class continuously polls GPU memory statistics to track current usage + and peak usage over a sliding time window. It serves as the source of truth + for the ModelManager's resource decisions. + + Attributes: +fallback_memory_mb: Default total memory if hardware detection fails. +poll_interval: Seconds between memory checks. +peak_window_seconds: Duration to track peak memory usage. + """ + def __init__( + self, + fallback_memory_mb: float = 16000.0, + poll_interval: float = 0.5, + peak_window_seconds: float = 30.0): +self._current_usage = 0.0 +self._peak_usage = 0.0 +self._total_memory = fallback_memory_mb +self._poll_interval = poll_interval +self._peak_window_seconds = peak_window_seconds +self._memory_history = deque() +self._running = False +self._thread = None +self._lock = threading.Lock() + + def _detect_hardware(self): +try: + cmd = [ + "nvidia-smi", + "--query-gpu=memory.total", + "--format=csv,noheader,nounits" + ] + output = subprocess.check_output(cmd, text=True).strip() + self._total_memory = float(output) + return True +except (FileNotFoundError, subprocess.CalledProcessError): + logger.warning( + "nvidia-smi not found or failed. Defaulting total memory to %s MB", + self._total_memory) + return False +except Exception as e: + logger.warning( + "Error parsing nvidia-smi output: %s. " + "Defaulting total memory to %s MB", + e, + self._total_memory) + return False + + def start(self): +self._gpu_available = self._detect_hardware() +if self._running or not self._gpu_available: + return +self._running = True +self._thread = threading.Thread(target=self._poll_loop, daemon=True) +self._thread.start() + + def stop(self): +self._running = False +if self._thread: + self._thread.join() + + def reset_peak(self): +with self._lock: + now = time.time() + self._memory_history.clear() + self._memory_history.append((now, self._current_usage)) + self._peak_usage = self._current_usage + + def get_stats(self) -> Tuple[float, float, float]: +with self._lock: + return self._current_usage, self._peak_usage, self._total_memory + + def refresh(self): +"""Forces an immediate poll of the GPU.""" +usage = self._get_nvidia_smi_used() +now = time.time() +with self._lock: + self._current_usage = usage + self._memory_history.append((now, usage)) + # Recalculate peak immediately + while self._memory_history and (now - self._memory_history[0][0] + > self._peak_window_seconds): +self._memory_history.popleft() + self._peak_usage = ( + max(m for _, m in self._memory_history) + if self._memory_history else usage) + + def
Re: [PR] Add model manager that automatically manage model across processes [beam]
AMOOOMA commented on code in PR #37113: URL: https://github.com/apache/beam/pull/37113#discussion_r2747631106 ## sdks/python/apache_beam/ml/inference/model_manager.py: ## @@ -0,0 +1,730 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Module for managing ML models in Apache Beam pipelines. + +This module provides classes and functions to efficiently manage multiple +machine learning models within Apache Beam pipelines. It includes functionality +for loading, caching, and updating models using multi-process shared memory, +ensuring that models are reused across different workers to optimize resource +usage and performance. +""" + +import gc +import heapq +import itertools +import logging +import subprocess +import threading +import time +from collections import Counter +from collections import OrderedDict +from collections import defaultdict +from collections import deque +from typing import Any +from typing import Callable +from typing import Dict +from typing import Optional +from typing import Tuple + +import numpy as np +import torch +from scipy.optimize import nnls + +from apache_beam.utils import multi_process_shared + +logger = logging.getLogger(__name__) + + +class GPUMonitor: + """Monitors GPU memory usage in a separate thread using nvidia-smi. + + This class continuously polls GPU memory statistics to track current usage + and peak usage over a sliding time window. It serves as the source of truth + for the ModelManager's resource decisions. + + Attributes: +fallback_memory_mb: Default total memory if hardware detection fails. +poll_interval: Seconds between memory checks. +peak_window_seconds: Duration to track peak memory usage. + """ + def __init__( + self, + fallback_memory_mb: float = 16000.0, + poll_interval: float = 0.5, + peak_window_seconds: float = 30.0): +self._current_usage = 0.0 +self._peak_usage = 0.0 +self._total_memory = fallback_memory_mb +self._poll_interval = poll_interval +self._peak_window_seconds = peak_window_seconds +self._memory_history = deque() +self._running = False +self._thread = None +self._lock = threading.Lock() + + def _detect_hardware(self): +try: + cmd = [ + "nvidia-smi", + "--query-gpu=memory.total", + "--format=csv,noheader,nounits" + ] + output = subprocess.check_output(cmd, text=True).strip() + self._total_memory = float(output) + return True +except (FileNotFoundError, subprocess.CalledProcessError): + logger.warning( + "nvidia-smi not found or failed. Defaulting total memory to %s MB", + self._total_memory) + return False +except Exception as e: + logger.warning( + "Error parsing nvidia-smi output: %s. " + "Defaulting total memory to %s MB", + e, + self._total_memory) + return False + + def start(self): +self._gpu_available = self._detect_hardware() +if self._running or not self._gpu_available: + return +self._running = True +self._thread = threading.Thread(target=self._poll_loop, daemon=True) +self._thread.start() + + def stop(self): +self._running = False +if self._thread: + self._thread.join() + + def reset_peak(self): +with self._lock: + now = time.time() + self._memory_history.clear() + self._memory_history.append((now, self._current_usage)) + self._peak_usage = self._current_usage + + def get_stats(self) -> Tuple[float, float, float]: +with self._lock: + return self._current_usage, self._peak_usage, self._total_memory + + def refresh(self): +"""Forces an immediate poll of the GPU.""" +usage = self._get_nvidia_smi_used() +now = time.time() +with self._lock: + self._current_usage = usage + self._memory_history.append((now, usage)) + # Recalculate peak immediately + while self._memory_history and (now - self._memory_history[0][0] + > self._peak_window_seconds): +self._memory_history.popleft() + self._peak_usage = ( + max(m for _, m in self._memory_history) + if self._memory_history else usage) + + def
Re: [PR] Add model manager that automatically manage model across processes [beam]
AMOOOMA commented on code in PR #37113: URL: https://github.com/apache/beam/pull/37113#discussion_r2747629755 ## sdks/python/apache_beam/ml/inference/model_manager.py: ## @@ -0,0 +1,730 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Module for managing ML models in Apache Beam pipelines. + +This module provides classes and functions to efficiently manage multiple +machine learning models within Apache Beam pipelines. It includes functionality +for loading, caching, and updating models using multi-process shared memory, +ensuring that models are reused across different workers to optimize resource +usage and performance. +""" + +import gc +import heapq +import itertools +import logging +import subprocess +import threading +import time +from collections import Counter +from collections import OrderedDict +from collections import defaultdict +from collections import deque +from typing import Any +from typing import Callable +from typing import Dict +from typing import Optional +from typing import Tuple + +import numpy as np +import torch +from scipy.optimize import nnls + +from apache_beam.utils import multi_process_shared + +logger = logging.getLogger(__name__) + + +class GPUMonitor: + """Monitors GPU memory usage in a separate thread using nvidia-smi. + + This class continuously polls GPU memory statistics to track current usage + and peak usage over a sliding time window. It serves as the source of truth + for the ModelManager's resource decisions. + + Attributes: +fallback_memory_mb: Default total memory if hardware detection fails. +poll_interval: Seconds between memory checks. +peak_window_seconds: Duration to track peak memory usage. + """ + def __init__( + self, + fallback_memory_mb: float = 16000.0, + poll_interval: float = 0.5, + peak_window_seconds: float = 30.0): +self._current_usage = 0.0 +self._peak_usage = 0.0 +self._total_memory = fallback_memory_mb +self._poll_interval = poll_interval +self._peak_window_seconds = peak_window_seconds +self._memory_history = deque() +self._running = False +self._thread = None +self._lock = threading.Lock() + + def _detect_hardware(self): +try: + cmd = [ + "nvidia-smi", + "--query-gpu=memory.total", + "--format=csv,noheader,nounits" + ] + output = subprocess.check_output(cmd, text=True).strip() + self._total_memory = float(output) + return True +except (FileNotFoundError, subprocess.CalledProcessError): + logger.warning( + "nvidia-smi not found or failed. Defaulting total memory to %s MB", + self._total_memory) + return False +except Exception as e: + logger.warning( + "Error parsing nvidia-smi output: %s. " + "Defaulting total memory to %s MB", + e, + self._total_memory) + return False + + def start(self): +self._gpu_available = self._detect_hardware() +if self._running or not self._gpu_available: + return +self._running = True +self._thread = threading.Thread(target=self._poll_loop, daemon=True) +self._thread.start() + + def stop(self): +self._running = False +if self._thread: + self._thread.join() + + def reset_peak(self): +with self._lock: + now = time.time() + self._memory_history.clear() + self._memory_history.append((now, self._current_usage)) + self._peak_usage = self._current_usage + + def get_stats(self) -> Tuple[float, float, float]: +with self._lock: + return self._current_usage, self._peak_usage, self._total_memory + + def refresh(self): +"""Forces an immediate poll of the GPU.""" +usage = self._get_nvidia_smi_used() +now = time.time() +with self._lock: + self._current_usage = usage + self._memory_history.append((now, usage)) + # Recalculate peak immediately + while self._memory_history and (now - self._memory_history[0][0] + > self._peak_window_seconds): +self._memory_history.popleft() + self._peak_usage = ( + max(m for _, m in self._memory_history) + if self._memory_history else usage) + + def
Re: [PR] Add model manager that automatically manage model across processes [beam]
AMOOOMA commented on code in PR #37113: URL: https://github.com/apache/beam/pull/37113#discussion_r2747628526 ## sdks/python/apache_beam/ml/inference/model_manager.py: ## @@ -0,0 +1,730 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Module for managing ML models in Apache Beam pipelines. + +This module provides classes and functions to efficiently manage multiple +machine learning models within Apache Beam pipelines. It includes functionality +for loading, caching, and updating models using multi-process shared memory, +ensuring that models are reused across different workers to optimize resource +usage and performance. +""" + +import gc +import heapq +import itertools +import logging +import subprocess +import threading +import time +from collections import Counter +from collections import OrderedDict +from collections import defaultdict +from collections import deque +from typing import Any +from typing import Callable +from typing import Dict +from typing import Optional +from typing import Tuple + +import numpy as np +import torch +from scipy.optimize import nnls + +from apache_beam.utils import multi_process_shared + +logger = logging.getLogger(__name__) + + +class GPUMonitor: + """Monitors GPU memory usage in a separate thread using nvidia-smi. + + This class continuously polls GPU memory statistics to track current usage + and peak usage over a sliding time window. It serves as the source of truth + for the ModelManager's resource decisions. + + Attributes: +fallback_memory_mb: Default total memory if hardware detection fails. +poll_interval: Seconds between memory checks. +peak_window_seconds: Duration to track peak memory usage. + """ + def __init__( + self, + fallback_memory_mb: float = 16000.0, + poll_interval: float = 0.5, + peak_window_seconds: float = 30.0): +self._current_usage = 0.0 +self._peak_usage = 0.0 +self._total_memory = fallback_memory_mb +self._poll_interval = poll_interval +self._peak_window_seconds = peak_window_seconds +self._memory_history = deque() +self._running = False +self._thread = None +self._lock = threading.Lock() + + def _detect_hardware(self): +try: + cmd = [ + "nvidia-smi", + "--query-gpu=memory.total", + "--format=csv,noheader,nounits" + ] + output = subprocess.check_output(cmd, text=True).strip() + self._total_memory = float(output) + return True +except (FileNotFoundError, subprocess.CalledProcessError): + logger.warning( + "nvidia-smi not found or failed. Defaulting total memory to %s MB", + self._total_memory) + return False +except Exception as e: + logger.warning( + "Error parsing nvidia-smi output: %s. " + "Defaulting total memory to %s MB", + e, + self._total_memory) + return False + + def start(self): +self._gpu_available = self._detect_hardware() +if self._running or not self._gpu_available: + return +self._running = True +self._thread = threading.Thread(target=self._poll_loop, daemon=True) +self._thread.start() + + def stop(self): +self._running = False +if self._thread: + self._thread.join() + + def reset_peak(self): +with self._lock: + now = time.time() + self._memory_history.clear() + self._memory_history.append((now, self._current_usage)) + self._peak_usage = self._current_usage + + def get_stats(self) -> Tuple[float, float, float]: +with self._lock: + return self._current_usage, self._peak_usage, self._total_memory + + def refresh(self): +"""Forces an immediate poll of the GPU.""" +usage = self._get_nvidia_smi_used() +now = time.time() +with self._lock: + self._current_usage = usage + self._memory_history.append((now, usage)) + # Recalculate peak immediately + while self._memory_history and (now - self._memory_history[0][0] + > self._peak_window_seconds): +self._memory_history.popleft() + self._peak_usage = ( + max(m for _, m in self._memory_history) + if self._memory_history else usage) + + def
Re: [PR] Add model manager that automatically manage model across processes [beam]
AMOOOMA commented on code in PR #37113: URL: https://github.com/apache/beam/pull/37113#discussion_r2747578101 ## sdks/python/apache_beam/ml/inference/model_manager.py: ## @@ -0,0 +1,721 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Module for managing ML models in Apache Beam pipelines. + +This module provides classes and functions to efficiently manage multiple +machine learning models within Apache Beam pipelines. It includes functionality +for loading, caching, and updating models using multi-process shared memory, +ensuring that models are reused across different workers to optimize resource +usage and performance. +""" + +import time +import threading +import subprocess +import logging +import gc +import numpy as np +from scipy.optimize import nnls +import torch +import heapq +import itertools +from collections import defaultdict, deque, Counter, OrderedDict +from typing import Dict, Any, Tuple, Optional, Callable +from apache_beam.utils import multi_process_shared + +logger = logging.getLogger(__name__) + + +class GPUMonitor: + """Monitors GPU memory usage in a separate thread using nvidia-smi. + + This class continuously polls GPU memory statistics to track current usage + and peak usage over a sliding time window. It serves as the source of truth + for the ModelManager's resource decisions. + + Attributes: +fallback_memory_mb: Default total memory if hardware detection fails. +poll_interval: Seconds between memory checks. +peak_window_seconds: Duration to track peak memory usage. + """ + def __init__( + self, + fallback_memory_mb: float = 16000.0, + poll_interval: float = 0.5, + peak_window_seconds: float = 30.0): +self._current_usage = 0.0 +self._peak_usage = 0.0 +self._total_memory = fallback_memory_mb +self._poll_interval = poll_interval +self._peak_window_seconds = peak_window_seconds +self._memory_history = deque() +self._running = False +self._thread = None +self._lock = threading.Lock() + + def _detect_hardware(self): +try: + cmd = [ + "nvidia-smi", + "--query-gpu=memory.total", + "--format=csv,noheader,nounits" + ] + output = subprocess.check_output(cmd, text=True).strip() + self._total_memory = float(output) + return True +except (FileNotFoundError, subprocess.CalledProcessError): + logger.warning( + "nvidia-smi not found or failed. Defaulting total memory to %s MB", + self._total_memory) + return False +except Exception as e: + logger.warning( + "Error parsing nvidia-smi output: %s. " + "Defaulting total memory to %s MB", + e, + self._total_memory) + return False + + def start(self): +self._gpu_available = self._detect_hardware() +if self._running or not self._gpu_available: + return +self._running = True +self._thread = threading.Thread(target=self._poll_loop, daemon=True) +self._thread.start() + + def stop(self): +self._running = False +if self._thread: + self._thread.join() + + def reset_peak(self): +with self._lock: + now = time.time() + self._memory_history.clear() + self._memory_history.append((now, self._current_usage)) + self._peak_usage = self._current_usage + + def get_stats(self) -> Tuple[float, float, float]: +with self._lock: + return self._current_usage, self._peak_usage, self._total_memory + + def refresh(self): +"""Forces an immediate poll of the GPU.""" +usage = self._get_nvidia_smi_used() +now = time.time() +with self._lock: + self._current_usage = usage + self._memory_history.append((now, usage)) + # Recalculate peak immediately + while self._memory_history and (now - self._memory_history[0][0] + > self._peak_window_seconds): +self._memory_history.popleft() + self._peak_usage = ( + max(m for _, m in self._memory_history) + if self._memory_history else usage) + + def _get_nvidia_smi_used(self) -> float: +try: + cmd = "nvidia-smi --query-gpu=memory.free --format=csv,noheader,nounits" + output = subproce
Re: [PR] Add model manager that automatically manage model across processes [beam]
AMOOOMA commented on code in PR #37113: URL: https://github.com/apache/beam/pull/37113#discussion_r2747575188 ## sdks/python/apache_beam/ml/inference/model_manager.py: ## @@ -0,0 +1,721 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Module for managing ML models in Apache Beam pipelines. + +This module provides classes and functions to efficiently manage multiple +machine learning models within Apache Beam pipelines. It includes functionality +for loading, caching, and updating models using multi-process shared memory, +ensuring that models are reused across different workers to optimize resource +usage and performance. +""" + +import time +import threading +import subprocess +import logging +import gc +import numpy as np +from scipy.optimize import nnls +import torch +import heapq +import itertools +from collections import defaultdict, deque, Counter, OrderedDict +from typing import Dict, Any, Tuple, Optional, Callable +from apache_beam.utils import multi_process_shared + +logger = logging.getLogger(__name__) + + +class GPUMonitor: + """Monitors GPU memory usage in a separate thread using nvidia-smi. + + This class continuously polls GPU memory statistics to track current usage + and peak usage over a sliding time window. It serves as the source of truth + for the ModelManager's resource decisions. + + Attributes: +fallback_memory_mb: Default total memory if hardware detection fails. +poll_interval: Seconds between memory checks. +peak_window_seconds: Duration to track peak memory usage. + """ + def __init__( + self, + fallback_memory_mb: float = 16000.0, + poll_interval: float = 0.5, + peak_window_seconds: float = 30.0): +self._current_usage = 0.0 +self._peak_usage = 0.0 +self._total_memory = fallback_memory_mb +self._poll_interval = poll_interval +self._peak_window_seconds = peak_window_seconds +self._memory_history = deque() +self._running = False +self._thread = None +self._lock = threading.Lock() + + def _detect_hardware(self): +try: + cmd = [ + "nvidia-smi", + "--query-gpu=memory.total", + "--format=csv,noheader,nounits" + ] + output = subprocess.check_output(cmd, text=True).strip() + self._total_memory = float(output) + return True +except (FileNotFoundError, subprocess.CalledProcessError): + logger.warning( + "nvidia-smi not found or failed. Defaulting total memory to %s MB", + self._total_memory) + return False +except Exception as e: + logger.warning( + "Error parsing nvidia-smi output: %s. " + "Defaulting total memory to %s MB", + e, + self._total_memory) + return False + + def start(self): +self._gpu_available = self._detect_hardware() +if self._running or not self._gpu_available: + return +self._running = True +self._thread = threading.Thread(target=self._poll_loop, daemon=True) +self._thread.start() + + def stop(self): +self._running = False +if self._thread: + self._thread.join() + + def reset_peak(self): +with self._lock: + now = time.time() + self._memory_history.clear() + self._memory_history.append((now, self._current_usage)) + self._peak_usage = self._current_usage + + def get_stats(self) -> Tuple[float, float, float]: +with self._lock: + return self._current_usage, self._peak_usage, self._total_memory + + def refresh(self): +"""Forces an immediate poll of the GPU.""" +usage = self._get_nvidia_smi_used() +now = time.time() +with self._lock: + self._current_usage = usage + self._memory_history.append((now, usage)) + # Recalculate peak immediately + while self._memory_history and (now - self._memory_history[0][0] + > self._peak_window_seconds): +self._memory_history.popleft() + self._peak_usage = ( + max(m for _, m in self._memory_history) + if self._memory_history else usage) + + def _get_nvidia_smi_used(self) -> float: +try: + cmd = "nvidia-smi --query-gpu=memory.free --format=csv,noheader,nounits" + output = subproce
Re: [PR] Add model manager that automatically manage model across processes [beam]
AMOOOMA commented on code in PR #37113: URL: https://github.com/apache/beam/pull/37113#discussion_r2747559629 ## sdks/python/apache_beam/ml/inference/model_manager.py: ## @@ -0,0 +1,721 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Module for managing ML models in Apache Beam pipelines. + +This module provides classes and functions to efficiently manage multiple +machine learning models within Apache Beam pipelines. It includes functionality +for loading, caching, and updating models using multi-process shared memory, +ensuring that models are reused across different workers to optimize resource +usage and performance. +""" + +import time +import threading +import subprocess +import logging +import gc +import numpy as np +from scipy.optimize import nnls +import torch +import heapq +import itertools +from collections import defaultdict, deque, Counter, OrderedDict +from typing import Dict, Any, Tuple, Optional, Callable +from apache_beam.utils import multi_process_shared + +logger = logging.getLogger(__name__) + + +class GPUMonitor: + """Monitors GPU memory usage in a separate thread using nvidia-smi. + + This class continuously polls GPU memory statistics to track current usage + and peak usage over a sliding time window. It serves as the source of truth + for the ModelManager's resource decisions. + + Attributes: +fallback_memory_mb: Default total memory if hardware detection fails. +poll_interval: Seconds between memory checks. +peak_window_seconds: Duration to track peak memory usage. + """ + def __init__( + self, + fallback_memory_mb: float = 16000.0, + poll_interval: float = 0.5, + peak_window_seconds: float = 30.0): +self._current_usage = 0.0 +self._peak_usage = 0.0 +self._total_memory = fallback_memory_mb +self._poll_interval = poll_interval +self._peak_window_seconds = peak_window_seconds +self._memory_history = deque() +self._running = False +self._thread = None +self._lock = threading.Lock() + + def _detect_hardware(self): +try: + cmd = [ + "nvidia-smi", + "--query-gpu=memory.total", + "--format=csv,noheader,nounits" + ] + output = subprocess.check_output(cmd, text=True).strip() + self._total_memory = float(output) + return True +except (FileNotFoundError, subprocess.CalledProcessError): + logger.warning( + "nvidia-smi not found or failed. Defaulting total memory to %s MB", + self._total_memory) + return False +except Exception as e: + logger.warning( + "Error parsing nvidia-smi output: %s. " + "Defaulting total memory to %s MB", + e, + self._total_memory) + return False + + def start(self): +self._gpu_available = self._detect_hardware() +if self._running or not self._gpu_available: + return +self._running = True +self._thread = threading.Thread(target=self._poll_loop, daemon=True) +self._thread.start() + + def stop(self): +self._running = False +if self._thread: + self._thread.join() + + def reset_peak(self): +with self._lock: + now = time.time() + self._memory_history.clear() + self._memory_history.append((now, self._current_usage)) + self._peak_usage = self._current_usage + + def get_stats(self) -> Tuple[float, float, float]: +with self._lock: + return self._current_usage, self._peak_usage, self._total_memory + + def refresh(self): +"""Forces an immediate poll of the GPU.""" +usage = self._get_nvidia_smi_used() +now = time.time() +with self._lock: + self._current_usage = usage + self._memory_history.append((now, usage)) + # Recalculate peak immediately + while self._memory_history and (now - self._memory_history[0][0] + > self._peak_window_seconds): +self._memory_history.popleft() + self._peak_usage = ( + max(m for _, m in self._memory_history) + if self._memory_history else usage) + + def _get_nvidia_smi_used(self) -> float: +try: + cmd = "nvidia-smi --query-gpu=memory.free --format=csv,noheader,nounits" + output = subproce
Re: [PR] Add model manager that automatically manage model across processes [beam]
AMOOOMA commented on code in PR #37113: URL: https://github.com/apache/beam/pull/37113#discussion_r2747538599 ## sdks/python/apache_beam/ml/inference/model_manager.py: ## @@ -0,0 +1,721 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Module for managing ML models in Apache Beam pipelines. + +This module provides classes and functions to efficiently manage multiple +machine learning models within Apache Beam pipelines. It includes functionality +for loading, caching, and updating models using multi-process shared memory, +ensuring that models are reused across different workers to optimize resource +usage and performance. +""" + +import time +import threading +import subprocess +import logging +import gc +import numpy as np +from scipy.optimize import nnls +import torch +import heapq +import itertools +from collections import defaultdict, deque, Counter, OrderedDict +from typing import Dict, Any, Tuple, Optional, Callable +from apache_beam.utils import multi_process_shared + +logger = logging.getLogger(__name__) + + +class GPUMonitor: + """Monitors GPU memory usage in a separate thread using nvidia-smi. + + This class continuously polls GPU memory statistics to track current usage + and peak usage over a sliding time window. It serves as the source of truth + for the ModelManager's resource decisions. + + Attributes: +fallback_memory_mb: Default total memory if hardware detection fails. +poll_interval: Seconds between memory checks. +peak_window_seconds: Duration to track peak memory usage. + """ + def __init__( + self, + fallback_memory_mb: float = 16000.0, + poll_interval: float = 0.5, + peak_window_seconds: float = 30.0): +self._current_usage = 0.0 +self._peak_usage = 0.0 +self._total_memory = fallback_memory_mb +self._poll_interval = poll_interval +self._peak_window_seconds = peak_window_seconds +self._memory_history = deque() +self._running = False +self._thread = None +self._lock = threading.Lock() + + def _detect_hardware(self): +try: + cmd = [ + "nvidia-smi", + "--query-gpu=memory.total", + "--format=csv,noheader,nounits" + ] + output = subprocess.check_output(cmd, text=True).strip() + self._total_memory = float(output) + return True +except (FileNotFoundError, subprocess.CalledProcessError): + logger.warning( + "nvidia-smi not found or failed. Defaulting total memory to %s MB", + self._total_memory) + return False +except Exception as e: + logger.warning( + "Error parsing nvidia-smi output: %s. " + "Defaulting total memory to %s MB", + e, + self._total_memory) + return False + + def start(self): +self._gpu_available = self._detect_hardware() +if self._running or not self._gpu_available: + return +self._running = True +self._thread = threading.Thread(target=self._poll_loop, daemon=True) +self._thread.start() + + def stop(self): +self._running = False +if self._thread: + self._thread.join() + + def reset_peak(self): +with self._lock: + now = time.time() + self._memory_history.clear() + self._memory_history.append((now, self._current_usage)) + self._peak_usage = self._current_usage + + def get_stats(self) -> Tuple[float, float, float]: +with self._lock: + return self._current_usage, self._peak_usage, self._total_memory + + def refresh(self): +"""Forces an immediate poll of the GPU.""" +usage = self._get_nvidia_smi_used() +now = time.time() +with self._lock: + self._current_usage = usage + self._memory_history.append((now, usage)) + # Recalculate peak immediately + while self._memory_history and (now - self._memory_history[0][0] + > self._peak_window_seconds): +self._memory_history.popleft() + self._peak_usage = ( + max(m for _, m in self._memory_history) + if self._memory_history else usage) + + def _get_nvidia_smi_used(self) -> float: +try: + cmd = "nvidia-smi --query-gpu=memory.free --format=csv,noheader,nounits" + output = subproce
Re: [PR] Add model manager that automatically manage model across processes [beam]
AMOOOMA commented on code in PR #37113: URL: https://github.com/apache/beam/pull/37113#discussion_r2747544211 ## sdks/python/apache_beam/ml/inference/model_manager.py: ## @@ -0,0 +1,721 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Module for managing ML models in Apache Beam pipelines. + +This module provides classes and functions to efficiently manage multiple +machine learning models within Apache Beam pipelines. It includes functionality +for loading, caching, and updating models using multi-process shared memory, +ensuring that models are reused across different workers to optimize resource +usage and performance. +""" + +import time +import threading +import subprocess +import logging +import gc +import numpy as np +from scipy.optimize import nnls +import torch +import heapq +import itertools +from collections import defaultdict, deque, Counter, OrderedDict +from typing import Dict, Any, Tuple, Optional, Callable +from apache_beam.utils import multi_process_shared + +logger = logging.getLogger(__name__) + + +class GPUMonitor: + """Monitors GPU memory usage in a separate thread using nvidia-smi. + + This class continuously polls GPU memory statistics to track current usage + and peak usage over a sliding time window. It serves as the source of truth + for the ModelManager's resource decisions. + + Attributes: +fallback_memory_mb: Default total memory if hardware detection fails. +poll_interval: Seconds between memory checks. +peak_window_seconds: Duration to track peak memory usage. + """ + def __init__( + self, + fallback_memory_mb: float = 16000.0, + poll_interval: float = 0.5, + peak_window_seconds: float = 30.0): +self._current_usage = 0.0 +self._peak_usage = 0.0 +self._total_memory = fallback_memory_mb +self._poll_interval = poll_interval +self._peak_window_seconds = peak_window_seconds +self._memory_history = deque() +self._running = False +self._thread = None +self._lock = threading.Lock() + + def _detect_hardware(self): +try: + cmd = [ + "nvidia-smi", + "--query-gpu=memory.total", + "--format=csv,noheader,nounits" + ] + output = subprocess.check_output(cmd, text=True).strip() + self._total_memory = float(output) + return True +except (FileNotFoundError, subprocess.CalledProcessError): + logger.warning( + "nvidia-smi not found or failed. Defaulting total memory to %s MB", + self._total_memory) + return False +except Exception as e: + logger.warning( + "Error parsing nvidia-smi output: %s. " + "Defaulting total memory to %s MB", + e, + self._total_memory) + return False + + def start(self): +self._gpu_available = self._detect_hardware() +if self._running or not self._gpu_available: + return +self._running = True +self._thread = threading.Thread(target=self._poll_loop, daemon=True) +self._thread.start() + + def stop(self): +self._running = False +if self._thread: + self._thread.join() + + def reset_peak(self): +with self._lock: + now = time.time() + self._memory_history.clear() + self._memory_history.append((now, self._current_usage)) + self._peak_usage = self._current_usage + + def get_stats(self) -> Tuple[float, float, float]: +with self._lock: + return self._current_usage, self._peak_usage, self._total_memory + + def refresh(self): +"""Forces an immediate poll of the GPU.""" +usage = self._get_nvidia_smi_used() +now = time.time() +with self._lock: + self._current_usage = usage + self._memory_history.append((now, usage)) + # Recalculate peak immediately + while self._memory_history and (now - self._memory_history[0][0] + > self._peak_window_seconds): +self._memory_history.popleft() + self._peak_usage = ( + max(m for _, m in self._memory_history) + if self._memory_history else usage) + + def _get_nvidia_smi_used(self) -> float: +try: + cmd = "nvidia-smi --query-gpu=memory.free --format=csv,noheader,nounits" + output = subproce
Re: [PR] Add model manager that automatically manage model across processes [beam]
AMOOOMA commented on code in PR #37113: URL: https://github.com/apache/beam/pull/37113#discussion_r2747533941 ## sdks/python/apache_beam/ml/inference/model_manager.py: ## @@ -0,0 +1,721 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Module for managing ML models in Apache Beam pipelines. + +This module provides classes and functions to efficiently manage multiple +machine learning models within Apache Beam pipelines. It includes functionality +for loading, caching, and updating models using multi-process shared memory, +ensuring that models are reused across different workers to optimize resource +usage and performance. +""" + +import time +import threading +import subprocess +import logging +import gc +import numpy as np +from scipy.optimize import nnls +import torch +import heapq +import itertools +from collections import defaultdict, deque, Counter, OrderedDict +from typing import Dict, Any, Tuple, Optional, Callable +from apache_beam.utils import multi_process_shared + +logger = logging.getLogger(__name__) + + +class GPUMonitor: + """Monitors GPU memory usage in a separate thread using nvidia-smi. + + This class continuously polls GPU memory statistics to track current usage + and peak usage over a sliding time window. It serves as the source of truth + for the ModelManager's resource decisions. + + Attributes: +fallback_memory_mb: Default total memory if hardware detection fails. +poll_interval: Seconds between memory checks. +peak_window_seconds: Duration to track peak memory usage. + """ + def __init__( + self, + fallback_memory_mb: float = 16000.0, + poll_interval: float = 0.5, + peak_window_seconds: float = 30.0): +self._current_usage = 0.0 +self._peak_usage = 0.0 +self._total_memory = fallback_memory_mb +self._poll_interval = poll_interval +self._peak_window_seconds = peak_window_seconds +self._memory_history = deque() +self._running = False +self._thread = None +self._lock = threading.Lock() + + def _detect_hardware(self): +try: + cmd = [ + "nvidia-smi", + "--query-gpu=memory.total", + "--format=csv,noheader,nounits" + ] + output = subprocess.check_output(cmd, text=True).strip() + self._total_memory = float(output) + return True +except (FileNotFoundError, subprocess.CalledProcessError): + logger.warning( + "nvidia-smi not found or failed. Defaulting total memory to %s MB", + self._total_memory) + return False +except Exception as e: + logger.warning( + "Error parsing nvidia-smi output: %s. " + "Defaulting total memory to %s MB", + e, + self._total_memory) + return False + + def start(self): +self._gpu_available = self._detect_hardware() +if self._running or not self._gpu_available: + return +self._running = True +self._thread = threading.Thread(target=self._poll_loop, daemon=True) +self._thread.start() + + def stop(self): +self._running = False +if self._thread: + self._thread.join() + + def reset_peak(self): +with self._lock: + now = time.time() + self._memory_history.clear() + self._memory_history.append((now, self._current_usage)) + self._peak_usage = self._current_usage + + def get_stats(self) -> Tuple[float, float, float]: +with self._lock: + return self._current_usage, self._peak_usage, self._total_memory + + def refresh(self): +"""Forces an immediate poll of the GPU.""" +usage = self._get_nvidia_smi_used() +now = time.time() +with self._lock: + self._current_usage = usage + self._memory_history.append((now, usage)) + # Recalculate peak immediately + while self._memory_history and (now - self._memory_history[0][0] + > self._peak_window_seconds): +self._memory_history.popleft() + self._peak_usage = ( + max(m for _, m in self._memory_history) + if self._memory_history else usage) + + def _get_nvidia_smi_used(self) -> float: +try: + cmd = "nvidia-smi --query-gpu=memory.free --format=csv,noheader,nounits" + output = subproce
Re: [PR] Add model manager that automatically manage model across processes [beam]
AMOOOMA commented on code in PR #37113: URL: https://github.com/apache/beam/pull/37113#discussion_r2747528037 ## sdks/python/apache_beam/ml/inference/model_manager.py: ## @@ -0,0 +1,721 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Module for managing ML models in Apache Beam pipelines. + +This module provides classes and functions to efficiently manage multiple +machine learning models within Apache Beam pipelines. It includes functionality +for loading, caching, and updating models using multi-process shared memory, +ensuring that models are reused across different workers to optimize resource +usage and performance. +""" + +import time +import threading +import subprocess +import logging +import gc +import numpy as np +from scipy.optimize import nnls +import torch +import heapq +import itertools +from collections import defaultdict, deque, Counter, OrderedDict +from typing import Dict, Any, Tuple, Optional, Callable +from apache_beam.utils import multi_process_shared + +logger = logging.getLogger(__name__) + + +class GPUMonitor: + """Monitors GPU memory usage in a separate thread using nvidia-smi. + + This class continuously polls GPU memory statistics to track current usage + and peak usage over a sliding time window. It serves as the source of truth + for the ModelManager's resource decisions. + + Attributes: +fallback_memory_mb: Default total memory if hardware detection fails. +poll_interval: Seconds between memory checks. +peak_window_seconds: Duration to track peak memory usage. + """ + def __init__( + self, + fallback_memory_mb: float = 16000.0, + poll_interval: float = 0.5, + peak_window_seconds: float = 30.0): +self._current_usage = 0.0 +self._peak_usage = 0.0 +self._total_memory = fallback_memory_mb +self._poll_interval = poll_interval +self._peak_window_seconds = peak_window_seconds +self._memory_history = deque() +self._running = False +self._thread = None +self._lock = threading.Lock() + + def _detect_hardware(self): +try: + cmd = [ + "nvidia-smi", + "--query-gpu=memory.total", + "--format=csv,noheader,nounits" + ] + output = subprocess.check_output(cmd, text=True).strip() + self._total_memory = float(output) + return True +except (FileNotFoundError, subprocess.CalledProcessError): + logger.warning( + "nvidia-smi not found or failed. Defaulting total memory to %s MB", + self._total_memory) + return False +except Exception as e: + logger.warning( + "Error parsing nvidia-smi output: %s. " + "Defaulting total memory to %s MB", + e, + self._total_memory) + return False + + def start(self): +self._gpu_available = self._detect_hardware() +if self._running or not self._gpu_available: + return +self._running = True +self._thread = threading.Thread(target=self._poll_loop, daemon=True) +self._thread.start() + + def stop(self): +self._running = False +if self._thread: + self._thread.join() + + def reset_peak(self): +with self._lock: + now = time.time() + self._memory_history.clear() + self._memory_history.append((now, self._current_usage)) + self._peak_usage = self._current_usage + + def get_stats(self) -> Tuple[float, float, float]: +with self._lock: + return self._current_usage, self._peak_usage, self._total_memory + + def refresh(self): +"""Forces an immediate poll of the GPU.""" +usage = self._get_nvidia_smi_used() +now = time.time() +with self._lock: + self._current_usage = usage + self._memory_history.append((now, usage)) + # Recalculate peak immediately + while self._memory_history and (now - self._memory_history[0][0] + > self._peak_window_seconds): +self._memory_history.popleft() + self._peak_usage = ( + max(m for _, m in self._memory_history) + if self._memory_history else usage) + + def _get_nvidia_smi_used(self) -> float: +try: + cmd = "nvidia-smi --query-gpu=memory.free --format=csv,noheader,nounits" + output = subproce
Re: [PR] Add model manager that automatically manage model across processes [beam]
gemini-code-assist[bot] commented on code in PR #37113: URL: https://github.com/apache/beam/pull/37113#discussion_r2747477968 ## sdks/python/apache_beam/ml/inference/model_manager.py: ## @@ -0,0 +1,730 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Module for managing ML models in Apache Beam pipelines. + +This module provides classes and functions to efficiently manage multiple +machine learning models within Apache Beam pipelines. It includes functionality +for loading, caching, and updating models using multi-process shared memory, +ensuring that models are reused across different workers to optimize resource +usage and performance. +""" + +import gc +import heapq +import itertools +import logging +import subprocess +import threading +import time +from collections import Counter +from collections import OrderedDict +from collections import defaultdict +from collections import deque +from typing import Any +from typing import Callable +from typing import Dict +from typing import Optional +from typing import Tuple + +import numpy as np +import torch +from scipy.optimize import nnls + +from apache_beam.utils import multi_process_shared + +logger = logging.getLogger(__name__) + + +class GPUMonitor: + """Monitors GPU memory usage in a separate thread using nvidia-smi. + + This class continuously polls GPU memory statistics to track current usage + and peak usage over a sliding time window. It serves as the source of truth + for the ModelManager's resource decisions. + + Attributes: +fallback_memory_mb: Default total memory if hardware detection fails. +poll_interval: Seconds between memory checks. +peak_window_seconds: Duration to track peak memory usage. + """ + def __init__( + self, + fallback_memory_mb: float = 16000.0, + poll_interval: float = 0.5, + peak_window_seconds: float = 30.0): +self._current_usage = 0.0 +self._peak_usage = 0.0 +self._total_memory = fallback_memory_mb +self._poll_interval = poll_interval +self._peak_window_seconds = peak_window_seconds +self._memory_history = deque() +self._running = False +self._thread = None +self._lock = threading.Lock() + + def _detect_hardware(self): +try: + cmd = [ + "nvidia-smi", + "--query-gpu=memory.total", + "--format=csv,noheader,nounits" + ] + output = subprocess.check_output(cmd, text=True).strip() + self._total_memory = float(output) + return True +except (FileNotFoundError, subprocess.CalledProcessError): + logger.warning( + "nvidia-smi not found or failed. Defaulting total memory to %s MB", + self._total_memory) + return False +except Exception as e: + logger.warning( + "Error parsing nvidia-smi output: %s. " + "Defaulting total memory to %s MB", + e, + self._total_memory) + return False + + def start(self): +self._gpu_available = self._detect_hardware() +if self._running or not self._gpu_available: + return +self._running = True +self._thread = threading.Thread(target=self._poll_loop, daemon=True) +self._thread.start() + + def stop(self): +self._running = False +if self._thread: + self._thread.join() + + def reset_peak(self): +with self._lock: + now = time.time() + self._memory_history.clear() + self._memory_history.append((now, self._current_usage)) + self._peak_usage = self._current_usage + + def get_stats(self) -> Tuple[float, float, float]: +with self._lock: + return self._current_usage, self._peak_usage, self._total_memory + + def refresh(self): +"""Forces an immediate poll of the GPU.""" +usage = self._get_nvidia_smi_used() +now = time.time() +with self._lock: + self._current_usage = usage + self._memory_history.append((now, usage)) + # Recalculate peak immediately + while self._memory_history and (now - self._memory_history[0][0] + > self._peak_window_seconds): +self._memory_history.popleft() + self._peak_usage = ( + max(m for _, m in self._memory_history) + if self._memory_history else
Re: [PR] Add model manager that automatically manage model across processes [beam]
damccorm commented on PR #37113: URL: https://github.com/apache/beam/pull/37113#issuecomment-3825073867 /gemini review -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Add model manager that automatically manage model across processes [beam]
codecov[bot] commented on PR #37113: URL: https://github.com/apache/beam/pull/37113#issuecomment-3802905586 ## [Codecov](https://app.codecov.io/gh/apache/beam/pull/37113?dropdown=coverage&src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report :x: Patch coverage is `2.09790%` with `420 lines` in your changes missing coverage. Please review. :white_check_mark: Project coverage is 39.97%. Comparing base ([`3ce2abd`](https://app.codecov.io/gh/apache/beam/commit/3ce2abdc505d09569b668d8071f89306ea9aeb81?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)) to head ([`8a856b0`](https://app.codecov.io/gh/apache/beam/commit/8a856b0f57e4907b238d1081e0ead09c7ef464ca?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)). :warning: Report is 233 commits behind head on master. | [Files with missing lines](https://app.codecov.io/gh/apache/beam/pull/37113?dropdown=coverage&src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Patch % | Lines | |---|---|---| | [...s/python/apache\_beam/ml/inference/model\_manager.py](https://app.codecov.io/gh/apache/beam/pull/37113?src=pr&el=tree&filepath=sdks%2Fpython%2Fapache_beam%2Fml%2Finference%2Fmodel_manager.py&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vbWwvaW5mZXJlbmNlL21vZGVsX21hbmFnZXIucHk=) | 2.09% | [420 Missing :warning: ](https://app.codecov.io/gh/apache/beam/pull/37113?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Additional details and impacted files ```diff @@ Coverage Diff @@ ## master #37113 +/- ## - Coverage 40.38% 39.97% -0.42% Complexity 3476 3476 Files 1226 1222 -4 Lines188339 187813 -526 Branches 3607 3607 - Hits 7605875074 -984 - Misses 108883 109341 +458 Partials 3398 3398 ``` | [Flag](https://app.codecov.io/gh/apache/beam/pull/37113/flags?src=pr&el=flags&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | | |---|---|---| | [python](https://app.codecov.io/gh/apache/beam/pull/37113/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `39.66% <2.09%> (-1.00%)` | :arrow_down: | Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more. [:umbrella: View full report in Codecov by Sentry](https://app.codecov.io/gh/apache/beam/pull/37113?dropdown=coverage&src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache). :loudspeaker: Have feedback on the report? [Share it here](https://about.codecov.io/codecov-pr-comment-feedback/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache). :rocket: New features to boost your workflow: - :snowflake: [Test Analytics](https://docs.codecov.com/docs/test-analytics): Detect flaky tests, report on failures, and find test suite problems. - :package: [JS Bundle Analysis](https://docs.codecov.com/docs/javascript-bundle-analysis): Save yourself from yourself by tracking and limiting bundle sizes in JS merges. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Add model manager that automatically manage model across processes [beam]
AMOOOMA commented on code in PR #37113: URL: https://github.com/apache/beam/pull/37113#discussion_r2730080557 ## sdks/python/apache_beam/ml/inference/model_manager.py: ## @@ -0,0 +1,704 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Module for managing ML models in Apache Beam pipelines. + +This module provides classes and functions to efficiently manage multiple +machine learning models within Apache Beam pipelines. It includes functionality +for loading, caching, and updating models using multi-process shared memory, +ensuring that models are reused across different workers to optimize resource +usage and performance. +""" + +import uuid +import time +import threading +import subprocess +import logging +import gc +import numpy as np +from scipy.optimize import nnls +import torch +import heapq +import itertools +from collections import defaultdict, deque, Counter, OrderedDict +from typing import Dict, Any, Tuple, Optional, Callable + +logger = logging.getLogger(__name__) + + +class GPUMonitor: + """Monitors GPU memory usage in a separate thread using nvidia-smi. + + This class continuously polls GPU memory statistics to track current usage + and peak usage over a sliding time window. It serves as the source of truth + for the ModelManager's resource decisions. + + Attributes: +fallback_memory_mb: Default total memory if hardware detection fails. +poll_interval: Seconds between memory checks. +peak_window_seconds: Duration to track peak memory usage. + """ + def __init__( + self, + fallback_memory_mb: float = 16000.0, + poll_interval: float = 0.5, + peak_window_seconds: float = 30.0): +self._current_usage = 0.0 +self._peak_usage = 0.0 +self._total_memory = fallback_memory_mb +self._poll_interval = poll_interval +self._peak_window_seconds = peak_window_seconds +self._memory_history = deque() +self._running = False +self._thread = None +self._lock = threading.Lock() + + def _detect_hardware(self): +try: + cmd = [ + "nvidia-smi", + "--query-gpu=memory.total", + "--format=csv,noheader,nounits" + ] + output = subprocess.check_output(cmd, text=True).strip() + self._total_memory = float(output) + return True +except (FileNotFoundError, subprocess.CalledProcessError): + logger.warning( + "nvidia-smi not found or failed. Defaulting total memory to %s MB", + self._total_memory) + return False +except Exception as e: + logger.warning( + "Error parsing nvidia-smi output: %s. " + "Defaulting total memory to %s MB", + e, + self._total_memory) + return False + + def start(self): +self._gpu_available = self._detect_hardware() +if self._running or not self._gpu_available: + return +self._running = True +self._thread = threading.Thread(target=self._poll_loop, daemon=True) +self._thread.start() + + def stop(self): +self._running = False +if self._thread: + self._thread.join() + + def reset_peak(self): +with self._lock: + now = time.time() + self._memory_history.clear() + self._memory_history.append((now, self._current_usage)) + self._peak_usage = self._current_usage + + def get_stats(self) -> Tuple[float, float, float]: +with self._lock: + return self._current_usage, self._peak_usage, self._total_memory + + def refresh(self): +"""Forces an immediate poll of the GPU.""" +usage = self._get_nvidia_smi_used() +now = time.time() +with self._lock: + self._current_usage = usage + self._memory_history.append((now, usage)) + # Recalculate peak immediately + while self._memory_history and (now - self._memory_history[0][0] + > self._peak_window_seconds): +self._memory_history.popleft() + self._peak_usage = ( + max(m for _, m in self._memory_history) + if self._memory_history else usage) + + def _get_nvidia_smi_used(self) -> float: +try: + cmd = "nvidia-smi --query-gpu=memory.free --format=csv,noheader,nounits" + output = subprocess.check_output(cmd, shell=True).decode
Re: [PR] Add model manager that automatically manage model across processes [beam]
AMOOOMA commented on code in PR #37113: URL: https://github.com/apache/beam/pull/37113#discussion_r2730085433 ## sdks/python/apache_beam/ml/inference/model_manager.py: ## @@ -0,0 +1,704 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Module for managing ML models in Apache Beam pipelines. + +This module provides classes and functions to efficiently manage multiple +machine learning models within Apache Beam pipelines. It includes functionality +for loading, caching, and updating models using multi-process shared memory, +ensuring that models are reused across different workers to optimize resource +usage and performance. +""" + +import uuid +import time +import threading +import subprocess +import logging +import gc +import numpy as np +from scipy.optimize import nnls +import torch +import heapq +import itertools +from collections import defaultdict, deque, Counter, OrderedDict +from typing import Dict, Any, Tuple, Optional, Callable + +logger = logging.getLogger(__name__) + + +class GPUMonitor: + """Monitors GPU memory usage in a separate thread using nvidia-smi. + + This class continuously polls GPU memory statistics to track current usage + and peak usage over a sliding time window. It serves as the source of truth + for the ModelManager's resource decisions. + + Attributes: +fallback_memory_mb: Default total memory if hardware detection fails. +poll_interval: Seconds between memory checks. +peak_window_seconds: Duration to track peak memory usage. + """ + def __init__( + self, + fallback_memory_mb: float = 16000.0, + poll_interval: float = 0.5, + peak_window_seconds: float = 30.0): +self._current_usage = 0.0 +self._peak_usage = 0.0 +self._total_memory = fallback_memory_mb +self._poll_interval = poll_interval +self._peak_window_seconds = peak_window_seconds +self._memory_history = deque() +self._running = False +self._thread = None +self._lock = threading.Lock() + + def _detect_hardware(self): +try: + cmd = [ + "nvidia-smi", + "--query-gpu=memory.total", + "--format=csv,noheader,nounits" + ] + output = subprocess.check_output(cmd, text=True).strip() + self._total_memory = float(output) + return True +except (FileNotFoundError, subprocess.CalledProcessError): + logger.warning( + "nvidia-smi not found or failed. Defaulting total memory to %s MB", + self._total_memory) + return False +except Exception as e: + logger.warning( + "Error parsing nvidia-smi output: %s. " + "Defaulting total memory to %s MB", + e, + self._total_memory) + return False + + def start(self): +self._gpu_available = self._detect_hardware() +if self._running or not self._gpu_available: + return +self._running = True +self._thread = threading.Thread(target=self._poll_loop, daemon=True) +self._thread.start() + + def stop(self): +self._running = False +if self._thread: + self._thread.join() + + def reset_peak(self): +with self._lock: + now = time.time() + self._memory_history.clear() + self._memory_history.append((now, self._current_usage)) + self._peak_usage = self._current_usage + + def get_stats(self) -> Tuple[float, float, float]: +with self._lock: + return self._current_usage, self._peak_usage, self._total_memory + + def refresh(self): +"""Forces an immediate poll of the GPU.""" +usage = self._get_nvidia_smi_used() +now = time.time() +with self._lock: + self._current_usage = usage + self._memory_history.append((now, usage)) + # Recalculate peak immediately + while self._memory_history and (now - self._memory_history[0][0] + > self._peak_window_seconds): +self._memory_history.popleft() + self._peak_usage = ( + max(m for _, m in self._memory_history) + if self._memory_history else usage) + + def _get_nvidia_smi_used(self) -> float: +try: + cmd = "nvidia-smi --query-gpu=memory.free --format=csv,noheader,nounits" + output = subprocess.check_output(cmd, shell=True).decode
Re: [PR] Add model manager that automatically manage model across processes [beam]
AMOOOMA commented on code in PR #37113: URL: https://github.com/apache/beam/pull/37113#discussion_r2730080960 ## sdks/python/apache_beam/ml/inference/model_manager.py: ## @@ -0,0 +1,704 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Module for managing ML models in Apache Beam pipelines. + +This module provides classes and functions to efficiently manage multiple +machine learning models within Apache Beam pipelines. It includes functionality +for loading, caching, and updating models using multi-process shared memory, +ensuring that models are reused across different workers to optimize resource +usage and performance. +""" + +import uuid +import time +import threading +import subprocess +import logging +import gc +import numpy as np +from scipy.optimize import nnls +import torch +import heapq +import itertools +from collections import defaultdict, deque, Counter, OrderedDict +from typing import Dict, Any, Tuple, Optional, Callable + +logger = logging.getLogger(__name__) + + +class GPUMonitor: + """Monitors GPU memory usage in a separate thread using nvidia-smi. + + This class continuously polls GPU memory statistics to track current usage + and peak usage over a sliding time window. It serves as the source of truth + for the ModelManager's resource decisions. + + Attributes: +fallback_memory_mb: Default total memory if hardware detection fails. +poll_interval: Seconds between memory checks. +peak_window_seconds: Duration to track peak memory usage. + """ + def __init__( + self, + fallback_memory_mb: float = 16000.0, + poll_interval: float = 0.5, + peak_window_seconds: float = 30.0): +self._current_usage = 0.0 +self._peak_usage = 0.0 +self._total_memory = fallback_memory_mb +self._poll_interval = poll_interval +self._peak_window_seconds = peak_window_seconds +self._memory_history = deque() +self._running = False +self._thread = None +self._lock = threading.Lock() + + def _detect_hardware(self): +try: + cmd = [ + "nvidia-smi", + "--query-gpu=memory.total", + "--format=csv,noheader,nounits" + ] + output = subprocess.check_output(cmd, text=True).strip() + self._total_memory = float(output) + return True +except (FileNotFoundError, subprocess.CalledProcessError): + logger.warning( + "nvidia-smi not found or failed. Defaulting total memory to %s MB", + self._total_memory) + return False +except Exception as e: + logger.warning( + "Error parsing nvidia-smi output: %s. " + "Defaulting total memory to %s MB", + e, + self._total_memory) + return False + + def start(self): +self._gpu_available = self._detect_hardware() +if self._running or not self._gpu_available: + return +self._running = True +self._thread = threading.Thread(target=self._poll_loop, daemon=True) +self._thread.start() + + def stop(self): +self._running = False +if self._thread: + self._thread.join() + + def reset_peak(self): +with self._lock: + now = time.time() + self._memory_history.clear() + self._memory_history.append((now, self._current_usage)) + self._peak_usage = self._current_usage + + def get_stats(self) -> Tuple[float, float, float]: +with self._lock: + return self._current_usage, self._peak_usage, self._total_memory + + def refresh(self): +"""Forces an immediate poll of the GPU.""" +usage = self._get_nvidia_smi_used() +now = time.time() +with self._lock: + self._current_usage = usage + self._memory_history.append((now, usage)) + # Recalculate peak immediately + while self._memory_history and (now - self._memory_history[0][0] + > self._peak_window_seconds): +self._memory_history.popleft() + self._peak_usage = ( + max(m for _, m in self._memory_history) + if self._memory_history else usage) + + def _get_nvidia_smi_used(self) -> float: +try: + cmd = "nvidia-smi --query-gpu=memory.free --format=csv,noheader,nounits" + output = subprocess.check_output(cmd, shell=True).decode
Re: [PR] Add model manager that automatically manage model across processes [beam]
AMOOOMA commented on code in PR #37113: URL: https://github.com/apache/beam/pull/37113#discussion_r2730076360 ## sdks/python/apache_beam/ml/inference/model_manager.py: ## @@ -0,0 +1,704 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Module for managing ML models in Apache Beam pipelines. + +This module provides classes and functions to efficiently manage multiple +machine learning models within Apache Beam pipelines. It includes functionality +for loading, caching, and updating models using multi-process shared memory, +ensuring that models are reused across different workers to optimize resource +usage and performance. +""" + +import uuid +import time +import threading +import subprocess +import logging +import gc +import numpy as np +from scipy.optimize import nnls +import torch +import heapq +import itertools +from collections import defaultdict, deque, Counter, OrderedDict +from typing import Dict, Any, Tuple, Optional, Callable + +logger = logging.getLogger(__name__) + + +class GPUMonitor: + """Monitors GPU memory usage in a separate thread using nvidia-smi. + + This class continuously polls GPU memory statistics to track current usage + and peak usage over a sliding time window. It serves as the source of truth + for the ModelManager's resource decisions. + + Attributes: +fallback_memory_mb: Default total memory if hardware detection fails. +poll_interval: Seconds between memory checks. +peak_window_seconds: Duration to track peak memory usage. + """ + def __init__( + self, + fallback_memory_mb: float = 16000.0, + poll_interval: float = 0.5, + peak_window_seconds: float = 30.0): +self._current_usage = 0.0 +self._peak_usage = 0.0 +self._total_memory = fallback_memory_mb +self._poll_interval = poll_interval +self._peak_window_seconds = peak_window_seconds +self._memory_history = deque() +self._running = False +self._thread = None +self._lock = threading.Lock() + + def _detect_hardware(self): +try: + cmd = [ + "nvidia-smi", + "--query-gpu=memory.total", + "--format=csv,noheader,nounits" + ] + output = subprocess.check_output(cmd, text=True).strip() + self._total_memory = float(output) + return True +except (FileNotFoundError, subprocess.CalledProcessError): + logger.warning( + "nvidia-smi not found or failed. Defaulting total memory to %s MB", + self._total_memory) + return False +except Exception as e: + logger.warning( + "Error parsing nvidia-smi output: %s. " + "Defaulting total memory to %s MB", + e, + self._total_memory) + return False + + def start(self): +self._gpu_available = self._detect_hardware() +if self._running or not self._gpu_available: + return +self._running = True +self._thread = threading.Thread(target=self._poll_loop, daemon=True) +self._thread.start() + + def stop(self): +self._running = False +if self._thread: + self._thread.join() + + def reset_peak(self): +with self._lock: + now = time.time() + self._memory_history.clear() + self._memory_history.append((now, self._current_usage)) + self._peak_usage = self._current_usage + + def get_stats(self) -> Tuple[float, float, float]: +with self._lock: + return self._current_usage, self._peak_usage, self._total_memory + + def refresh(self): +"""Forces an immediate poll of the GPU.""" +usage = self._get_nvidia_smi_used() +now = time.time() +with self._lock: + self._current_usage = usage + self._memory_history.append((now, usage)) + # Recalculate peak immediately + while self._memory_history and (now - self._memory_history[0][0] + > self._peak_window_seconds): +self._memory_history.popleft() + self._peak_usage = ( + max(m for _, m in self._memory_history) + if self._memory_history else usage) + + def _get_nvidia_smi_used(self) -> float: +try: + cmd = "nvidia-smi --query-gpu=memory.free --format=csv,noheader,nounits" + output = subprocess.check_output(cmd, shell=True).decode
Re: [PR] Add model manager that automatically manage model across processes [beam]
AMOOOMA commented on code in PR #37113: URL: https://github.com/apache/beam/pull/37113#discussion_r2730070881 ## sdks/python/apache_beam/ml/inference/model_manager.py: ## @@ -0,0 +1,704 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Module for managing ML models in Apache Beam pipelines. + +This module provides classes and functions to efficiently manage multiple +machine learning models within Apache Beam pipelines. It includes functionality +for loading, caching, and updating models using multi-process shared memory, +ensuring that models are reused across different workers to optimize resource +usage and performance. +""" + +import uuid +import time +import threading +import subprocess +import logging +import gc +import numpy as np +from scipy.optimize import nnls +import torch +import heapq +import itertools +from collections import defaultdict, deque, Counter, OrderedDict +from typing import Dict, Any, Tuple, Optional, Callable + +logger = logging.getLogger(__name__) + + +class GPUMonitor: + """Monitors GPU memory usage in a separate thread using nvidia-smi. + + This class continuously polls GPU memory statistics to track current usage + and peak usage over a sliding time window. It serves as the source of truth + for the ModelManager's resource decisions. + + Attributes: +fallback_memory_mb: Default total memory if hardware detection fails. +poll_interval: Seconds between memory checks. +peak_window_seconds: Duration to track peak memory usage. + """ + def __init__( + self, + fallback_memory_mb: float = 16000.0, + poll_interval: float = 0.5, + peak_window_seconds: float = 30.0): +self._current_usage = 0.0 +self._peak_usage = 0.0 +self._total_memory = fallback_memory_mb +self._poll_interval = poll_interval +self._peak_window_seconds = peak_window_seconds +self._memory_history = deque() +self._running = False +self._thread = None +self._lock = threading.Lock() + + def _detect_hardware(self): +try: + cmd = [ + "nvidia-smi", + "--query-gpu=memory.total", + "--format=csv,noheader,nounits" + ] + output = subprocess.check_output(cmd, text=True).strip() + self._total_memory = float(output) + return True +except (FileNotFoundError, subprocess.CalledProcessError): + logger.warning( + "nvidia-smi not found or failed. Defaulting total memory to %s MB", + self._total_memory) + return False +except Exception as e: + logger.warning( + "Error parsing nvidia-smi output: %s. " + "Defaulting total memory to %s MB", + e, + self._total_memory) + return False + + def start(self): +self._gpu_available = self._detect_hardware() +if self._running or not self._gpu_available: + return +self._running = True +self._thread = threading.Thread(target=self._poll_loop, daemon=True) +self._thread.start() + + def stop(self): +self._running = False +if self._thread: + self._thread.join() + + def reset_peak(self): +with self._lock: + now = time.time() + self._memory_history.clear() + self._memory_history.append((now, self._current_usage)) + self._peak_usage = self._current_usage + + def get_stats(self) -> Tuple[float, float, float]: +with self._lock: + return self._current_usage, self._peak_usage, self._total_memory + + def refresh(self): +"""Forces an immediate poll of the GPU.""" +usage = self._get_nvidia_smi_used() +now = time.time() +with self._lock: + self._current_usage = usage + self._memory_history.append((now, usage)) + # Recalculate peak immediately + while self._memory_history and (now - self._memory_history[0][0] + > self._peak_window_seconds): +self._memory_history.popleft() + self._peak_usage = ( + max(m for _, m in self._memory_history) + if self._memory_history else usage) + + def _get_nvidia_smi_used(self) -> float: +try: + cmd = "nvidia-smi --query-gpu=memory.free --format=csv,noheader,nounits" + output = subprocess.check_output(cmd, shell=True).decode
Re: [PR] Add model manager that automatically manage model across processes [beam]
AMOOOMA commented on code in PR #37113: URL: https://github.com/apache/beam/pull/37113#discussion_r2730069151 ## sdks/python/apache_beam/ml/inference/model_manager.py: ## @@ -0,0 +1,704 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Module for managing ML models in Apache Beam pipelines. + +This module provides classes and functions to efficiently manage multiple +machine learning models within Apache Beam pipelines. It includes functionality +for loading, caching, and updating models using multi-process shared memory, +ensuring that models are reused across different workers to optimize resource +usage and performance. +""" + +import uuid +import time +import threading +import subprocess +import logging +import gc +import numpy as np +from scipy.optimize import nnls +import torch +import heapq +import itertools +from collections import defaultdict, deque, Counter, OrderedDict +from typing import Dict, Any, Tuple, Optional, Callable + +logger = logging.getLogger(__name__) + + +class GPUMonitor: + """Monitors GPU memory usage in a separate thread using nvidia-smi. + + This class continuously polls GPU memory statistics to track current usage + and peak usage over a sliding time window. It serves as the source of truth + for the ModelManager's resource decisions. + + Attributes: +fallback_memory_mb: Default total memory if hardware detection fails. +poll_interval: Seconds between memory checks. +peak_window_seconds: Duration to track peak memory usage. + """ + def __init__( + self, + fallback_memory_mb: float = 16000.0, + poll_interval: float = 0.5, + peak_window_seconds: float = 30.0): +self._current_usage = 0.0 +self._peak_usage = 0.0 +self._total_memory = fallback_memory_mb +self._poll_interval = poll_interval +self._peak_window_seconds = peak_window_seconds +self._memory_history = deque() +self._running = False +self._thread = None +self._lock = threading.Lock() + + def _detect_hardware(self): +try: + cmd = [ + "nvidia-smi", + "--query-gpu=memory.total", + "--format=csv,noheader,nounits" + ] + output = subprocess.check_output(cmd, text=True).strip() + self._total_memory = float(output) + return True +except (FileNotFoundError, subprocess.CalledProcessError): + logger.warning( + "nvidia-smi not found or failed. Defaulting total memory to %s MB", + self._total_memory) + return False +except Exception as e: + logger.warning( + "Error parsing nvidia-smi output: %s. " + "Defaulting total memory to %s MB", + e, + self._total_memory) + return False + + def start(self): +self._gpu_available = self._detect_hardware() +if self._running or not self._gpu_available: + return +self._running = True +self._thread = threading.Thread(target=self._poll_loop, daemon=True) +self._thread.start() + + def stop(self): +self._running = False +if self._thread: + self._thread.join() + + def reset_peak(self): +with self._lock: + now = time.time() + self._memory_history.clear() + self._memory_history.append((now, self._current_usage)) + self._peak_usage = self._current_usage + + def get_stats(self) -> Tuple[float, float, float]: +with self._lock: + return self._current_usage, self._peak_usage, self._total_memory + + def refresh(self): +"""Forces an immediate poll of the GPU.""" +usage = self._get_nvidia_smi_used() +now = time.time() +with self._lock: + self._current_usage = usage + self._memory_history.append((now, usage)) + # Recalculate peak immediately + while self._memory_history and (now - self._memory_history[0][0] + > self._peak_window_seconds): +self._memory_history.popleft() + self._peak_usage = ( + max(m for _, m in self._memory_history) + if self._memory_history else usage) + + def _get_nvidia_smi_used(self) -> float: +try: + cmd = "nvidia-smi --query-gpu=memory.free --format=csv,noheader,nounits" + output = subprocess.check_output(cmd, shell=True).decode
Re: [PR] Add model manager that automatically manage model across processes [beam]
AMOOOMA commented on code in PR #37113: URL: https://github.com/apache/beam/pull/37113#discussion_r2730067529 ## sdks/python/apache_beam/ml/inference/model_manager.py: ## @@ -0,0 +1,704 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Module for managing ML models in Apache Beam pipelines. + +This module provides classes and functions to efficiently manage multiple +machine learning models within Apache Beam pipelines. It includes functionality +for loading, caching, and updating models using multi-process shared memory, +ensuring that models are reused across different workers to optimize resource +usage and performance. +""" + +import uuid +import time +import threading +import subprocess +import logging +import gc +import numpy as np +from scipy.optimize import nnls +import torch +import heapq +import itertools +from collections import defaultdict, deque, Counter, OrderedDict +from typing import Dict, Any, Tuple, Optional, Callable + +logger = logging.getLogger(__name__) + + +class GPUMonitor: + """Monitors GPU memory usage in a separate thread using nvidia-smi. + + This class continuously polls GPU memory statistics to track current usage + and peak usage over a sliding time window. It serves as the source of truth + for the ModelManager's resource decisions. + + Attributes: +fallback_memory_mb: Default total memory if hardware detection fails. +poll_interval: Seconds between memory checks. +peak_window_seconds: Duration to track peak memory usage. + """ + def __init__( + self, + fallback_memory_mb: float = 16000.0, + poll_interval: float = 0.5, + peak_window_seconds: float = 30.0): +self._current_usage = 0.0 +self._peak_usage = 0.0 +self._total_memory = fallback_memory_mb +self._poll_interval = poll_interval +self._peak_window_seconds = peak_window_seconds +self._memory_history = deque() +self._running = False +self._thread = None +self._lock = threading.Lock() + + def _detect_hardware(self): +try: + cmd = [ + "nvidia-smi", + "--query-gpu=memory.total", + "--format=csv,noheader,nounits" + ] + output = subprocess.check_output(cmd, text=True).strip() + self._total_memory = float(output) + return True +except (FileNotFoundError, subprocess.CalledProcessError): + logger.warning( + "nvidia-smi not found or failed. Defaulting total memory to %s MB", + self._total_memory) + return False +except Exception as e: + logger.warning( + "Error parsing nvidia-smi output: %s. " + "Defaulting total memory to %s MB", + e, + self._total_memory) + return False + + def start(self): +self._gpu_available = self._detect_hardware() +if self._running or not self._gpu_available: + return +self._running = True +self._thread = threading.Thread(target=self._poll_loop, daemon=True) +self._thread.start() + + def stop(self): +self._running = False +if self._thread: + self._thread.join() + + def reset_peak(self): +with self._lock: + now = time.time() + self._memory_history.clear() + self._memory_history.append((now, self._current_usage)) + self._peak_usage = self._current_usage + + def get_stats(self) -> Tuple[float, float, float]: +with self._lock: + return self._current_usage, self._peak_usage, self._total_memory + + def refresh(self): +"""Forces an immediate poll of the GPU.""" +usage = self._get_nvidia_smi_used() +now = time.time() +with self._lock: + self._current_usage = usage + self._memory_history.append((now, usage)) + # Recalculate peak immediately + while self._memory_history and (now - self._memory_history[0][0] + > self._peak_window_seconds): +self._memory_history.popleft() + self._peak_usage = ( + max(m for _, m in self._memory_history) + if self._memory_history else usage) + + def _get_nvidia_smi_used(self) -> float: +try: + cmd = "nvidia-smi --query-gpu=memory.free --format=csv,noheader,nounits" + output = subprocess.check_output(cmd, shell=True).decode
Re: [PR] Add model manager that automatically manage model across processes [beam]
AMOOOMA commented on code in PR #37113:
URL: https://github.com/apache/beam/pull/37113#discussion_r2730067239
##
sdks/python/apache_beam/ml/inference/model_manager.py:
##
@@ -0,0 +1,669 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Module for managing ML models in Apache Beam pipelines.
+
+This module provides classes and functions to efficiently manage multiple
+machine learning models within Apache Beam pipelines. It includes functionality
+for loading, caching, and updating models using multi-process shared memory,
+ensuring that models are reused across different workers to optimize resource
+usage and performance.
+"""
+
+import uuid
+import time
+import threading
+import subprocess
+import logging
+import gc
+import numpy as np
+from scipy.optimize import nnls
+import torch
+import heapq
+import itertools
+from collections import defaultdict, deque, Counter, OrderedDict
+from typing import Dict, Any, Tuple, Optional, Callable
+
+logger = logging.getLogger(__name__)
+
+
+class GPUMonitor:
+ def __init__(
+ self,
+ fallback_memory_mb: float = 16000.0,
+ poll_interval: float = 0.5,
+ peak_window_seconds: float = 30.0):
+self._current_usage = 0.0
+self._peak_usage = 0.0
+self._total_memory = fallback_memory_mb
+self._poll_interval = poll_interval
+self._peak_window_seconds = peak_window_seconds
+self._memory_history = deque()
+self._running = False
+self._thread = None
+self._lock = threading.Lock()
+self._gpu_available = self._detect_hardware()
+
+ def _detect_hardware(self):
+try:
+ cmd = [
+ "nvidia-smi",
+ "--query-gpu=memory.total",
+ "--format=csv,noheader,nounits"
+ ]
+ output = subprocess.check_output(cmd, text=True).strip()
+ self._total_memory = float(output)
+ return True
+except (FileNotFoundError, subprocess.CalledProcessError):
+ logger.warning(
+ "nvidia-smi not found or failed. Defaulting total memory to %s MB",
+ self._total_memory)
+ return False
+except Exception as e:
+ logger.warning(
+ "Error parsing nvidia-smi output: %s. "
+ "Defaulting total memory to %s MB",
+ e,
+ self._total_memory)
+ return False
+
+ def start(self):
+if self._running or not self._gpu_available:
+ return
+self._running = True
+self._thread = threading.Thread(target=self._poll_loop, daemon=True)
+self._thread.start()
+
+ def stop(self):
+self._running = False
+if self._thread:
+ self._thread.join()
+
+ def reset_peak(self):
+with self._lock:
+ now = time.time()
+ self._memory_history.clear()
+ self._memory_history.append((now, self._current_usage))
+ self._peak_usage = self._current_usage
+
+ def get_stats(self) -> Tuple[float, float, float]:
+with self._lock:
+ return self._current_usage, self._peak_usage, self._total_memory
+
+ def refresh(self):
+"""Forces an immediate poll of the GPU."""
+usage = self._get_nvidia_smi_used()
+now = time.time()
+with self._lock:
+ self._current_usage = usage
+ self._memory_history.append((now, usage))
+ # Recalculate peak immediately
+ while self._memory_history and (now - self._memory_history[0][0]
+ > self._peak_window_seconds):
+self._memory_history.popleft()
+ self._peak_usage = (
+ max(m for _, m in self._memory_history)
+ if self._memory_history else usage)
+
+ def _get_nvidia_smi_used(self) -> float:
+try:
+ cmd = "nvidia-smi --query-gpu=memory.free --format=csv,noheader,nounits"
+ output = subprocess.check_output(cmd, shell=True).decode("utf-8").strip()
+ free_memory = float(output)
+ return self._total_memory - free_memory
+except Exception:
+ return 0.0
+
+ def _poll_loop(self):
+while self._running:
+ usage = self._get_nvidia_smi_used()
+ now = time.time()
+ with self._lock:
+self._current_usage = usage
+self._memory_history.append((now, usage))
+while self._memory_history and (now - self._memory_history[0][0]
+
Re: [PR] Add model manager that automatically manage model across processes [beam]
damccorm commented on code in PR #37113: URL: https://github.com/apache/beam/pull/37113#discussion_r2729477496 ## sdks/python/apache_beam/ml/inference/model_manager.py: ## @@ -0,0 +1,704 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Module for managing ML models in Apache Beam pipelines. + +This module provides classes and functions to efficiently manage multiple +machine learning models within Apache Beam pipelines. It includes functionality +for loading, caching, and updating models using multi-process shared memory, +ensuring that models are reused across different workers to optimize resource +usage and performance. +""" + +import uuid +import time +import threading +import subprocess +import logging +import gc +import numpy as np +from scipy.optimize import nnls +import torch +import heapq +import itertools +from collections import defaultdict, deque, Counter, OrderedDict +from typing import Dict, Any, Tuple, Optional, Callable + +logger = logging.getLogger(__name__) + + +class GPUMonitor: + """Monitors GPU memory usage in a separate thread using nvidia-smi. + + This class continuously polls GPU memory statistics to track current usage + and peak usage over a sliding time window. It serves as the source of truth + for the ModelManager's resource decisions. + + Attributes: +fallback_memory_mb: Default total memory if hardware detection fails. +poll_interval: Seconds between memory checks. +peak_window_seconds: Duration to track peak memory usage. + """ + def __init__( + self, + fallback_memory_mb: float = 16000.0, + poll_interval: float = 0.5, + peak_window_seconds: float = 30.0): +self._current_usage = 0.0 +self._peak_usage = 0.0 +self._total_memory = fallback_memory_mb +self._poll_interval = poll_interval +self._peak_window_seconds = peak_window_seconds +self._memory_history = deque() +self._running = False +self._thread = None +self._lock = threading.Lock() + + def _detect_hardware(self): +try: + cmd = [ + "nvidia-smi", + "--query-gpu=memory.total", + "--format=csv,noheader,nounits" + ] + output = subprocess.check_output(cmd, text=True).strip() + self._total_memory = float(output) + return True +except (FileNotFoundError, subprocess.CalledProcessError): + logger.warning( + "nvidia-smi not found or failed. Defaulting total memory to %s MB", + self._total_memory) + return False +except Exception as e: + logger.warning( + "Error parsing nvidia-smi output: %s. " + "Defaulting total memory to %s MB", + e, + self._total_memory) + return False + + def start(self): +self._gpu_available = self._detect_hardware() +if self._running or not self._gpu_available: + return +self._running = True +self._thread = threading.Thread(target=self._poll_loop, daemon=True) +self._thread.start() + + def stop(self): +self._running = False +if self._thread: + self._thread.join() + + def reset_peak(self): +with self._lock: + now = time.time() + self._memory_history.clear() + self._memory_history.append((now, self._current_usage)) + self._peak_usage = self._current_usage + + def get_stats(self) -> Tuple[float, float, float]: +with self._lock: + return self._current_usage, self._peak_usage, self._total_memory + + def refresh(self): +"""Forces an immediate poll of the GPU.""" +usage = self._get_nvidia_smi_used() +now = time.time() +with self._lock: + self._current_usage = usage + self._memory_history.append((now, usage)) + # Recalculate peak immediately + while self._memory_history and (now - self._memory_history[0][0] + > self._peak_window_seconds): +self._memory_history.popleft() + self._peak_usage = ( + max(m for _, m in self._memory_history) + if self._memory_history else usage) + + def _get_nvidia_smi_used(self) -> float: +try: + cmd = "nvidia-smi --query-gpu=memory.free --format=csv,noheader,nounits" + output = subprocess.check_output(cmd, shell=True).decod
Re: [PR] Add model manager that automatically manage model across processes [beam]
AMOOOMA commented on code in PR #37113:
URL: https://github.com/apache/beam/pull/37113#discussion_r2723093557
##
sdks/python/apache_beam/ml/inference/model_manager.py:
##
@@ -0,0 +1,669 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Module for managing ML models in Apache Beam pipelines.
+
+This module provides classes and functions to efficiently manage multiple
+machine learning models within Apache Beam pipelines. It includes functionality
+for loading, caching, and updating models using multi-process shared memory,
+ensuring that models are reused across different workers to optimize resource
+usage and performance.
+"""
+
+import uuid
+import time
+import threading
+import subprocess
+import logging
+import gc
+import numpy as np
+from scipy.optimize import nnls
+import torch
+import heapq
+import itertools
+from collections import defaultdict, deque, Counter, OrderedDict
+from typing import Dict, Any, Tuple, Optional, Callable
+
+logger = logging.getLogger(__name__)
+
+
+class GPUMonitor:
+ def __init__(
+ self,
+ fallback_memory_mb: float = 16000.0,
+ poll_interval: float = 0.5,
+ peak_window_seconds: float = 30.0):
+self._current_usage = 0.0
+self._peak_usage = 0.0
+self._total_memory = fallback_memory_mb
+self._poll_interval = poll_interval
+self._peak_window_seconds = peak_window_seconds
+self._memory_history = deque()
+self._running = False
+self._thread = None
+self._lock = threading.Lock()
+self._gpu_available = self._detect_hardware()
+
+ def _detect_hardware(self):
+try:
+ cmd = [
+ "nvidia-smi",
+ "--query-gpu=memory.total",
+ "--format=csv,noheader,nounits"
+ ]
+ output = subprocess.check_output(cmd, text=True).strip()
+ self._total_memory = float(output)
+ return True
+except (FileNotFoundError, subprocess.CalledProcessError):
+ logger.warning(
+ "nvidia-smi not found or failed. Defaulting total memory to %s MB",
+ self._total_memory)
+ return False
+except Exception as e:
+ logger.warning(
+ "Error parsing nvidia-smi output: %s. "
+ "Defaulting total memory to %s MB",
+ e,
+ self._total_memory)
+ return False
+
+ def start(self):
+if self._running or not self._gpu_available:
+ return
+self._running = True
+self._thread = threading.Thread(target=self._poll_loop, daemon=True)
+self._thread.start()
+
+ def stop(self):
+self._running = False
+if self._thread:
+ self._thread.join()
+
+ def reset_peak(self):
+with self._lock:
+ now = time.time()
+ self._memory_history.clear()
+ self._memory_history.append((now, self._current_usage))
+ self._peak_usage = self._current_usage
+
+ def get_stats(self) -> Tuple[float, float, float]:
+with self._lock:
+ return self._current_usage, self._peak_usage, self._total_memory
+
+ def refresh(self):
+"""Forces an immediate poll of the GPU."""
+usage = self._get_nvidia_smi_used()
+now = time.time()
+with self._lock:
+ self._current_usage = usage
+ self._memory_history.append((now, usage))
+ # Recalculate peak immediately
+ while self._memory_history and (now - self._memory_history[0][0]
+ > self._peak_window_seconds):
+self._memory_history.popleft()
+ self._peak_usage = (
+ max(m for _, m in self._memory_history)
+ if self._memory_history else usage)
+
+ def _get_nvidia_smi_used(self) -> float:
+try:
+ cmd = "nvidia-smi --query-gpu=memory.free --format=csv,noheader,nounits"
+ output = subprocess.check_output(cmd, shell=True).decode("utf-8").strip()
+ free_memory = float(output)
+ return self._total_memory - free_memory
+except Exception:
+ return 0.0
+
+ def _poll_loop(self):
+while self._running:
+ usage = self._get_nvidia_smi_used()
+ now = time.time()
+ with self._lock:
+self._current_usage = usage
+self._memory_history.append((now, usage))
+while self._memory_history and (now - self._memory_history[0][0]
+
Re: [PR] Add model manager that automatically manage model across processes [beam]
AMOOOMA commented on code in PR #37113: URL: https://github.com/apache/beam/pull/37113#discussion_r2723092867 ## sdks/python/apache_beam/ml/inference/model_manager.py: ## @@ -0,0 +1,669 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Module for managing ML models in Apache Beam pipelines. + +This module provides classes and functions to efficiently manage multiple +machine learning models within Apache Beam pipelines. It includes functionality +for loading, caching, and updating models using multi-process shared memory, +ensuring that models are reused across different workers to optimize resource +usage and performance. +""" + +import uuid +import time +import threading +import subprocess +import logging +import gc +import numpy as np +from scipy.optimize import nnls +import torch +import heapq +import itertools +from collections import defaultdict, deque, Counter, OrderedDict +from typing import Dict, Any, Tuple, Optional, Callable + +logger = logging.getLogger(__name__) + + +class GPUMonitor: + def __init__( + self, + fallback_memory_mb: float = 16000.0, + poll_interval: float = 0.5, + peak_window_seconds: float = 30.0): +self._current_usage = 0.0 +self._peak_usage = 0.0 +self._total_memory = fallback_memory_mb +self._poll_interval = poll_interval +self._peak_window_seconds = peak_window_seconds +self._memory_history = deque() +self._running = False +self._thread = None +self._lock = threading.Lock() +self._gpu_available = self._detect_hardware() Review Comment: Done. ## sdks/python/apache_beam/ml/inference/model_manager.py: ## @@ -0,0 +1,669 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Module for managing ML models in Apache Beam pipelines. + +This module provides classes and functions to efficiently manage multiple +machine learning models within Apache Beam pipelines. It includes functionality +for loading, caching, and updating models using multi-process shared memory, +ensuring that models are reused across different workers to optimize resource +usage and performance. +""" + +import uuid +import time +import threading +import subprocess +import logging +import gc +import numpy as np +from scipy.optimize import nnls +import torch +import heapq +import itertools +from collections import defaultdict, deque, Counter, OrderedDict +from typing import Dict, Any, Tuple, Optional, Callable + +logger = logging.getLogger(__name__) + + +class GPUMonitor: + def __init__( + self, + fallback_memory_mb: float = 16000.0, + poll_interval: float = 0.5, + peak_window_seconds: float = 30.0): +self._current_usage = 0.0 +self._peak_usage = 0.0 +self._total_memory = fallback_memory_mb +self._poll_interval = poll_interval +self._peak_window_seconds = peak_window_seconds +self._memory_history = deque() +self._running = False +self._thread = None +self._lock = threading.Lock() +self._gpu_available = self._detect_hardware() + + def _detect_hardware(self): +try: + cmd = [ + "nvidia-smi", + "--query-gpu=memory.total", + "--format=csv,noheader,nounits" + ] + output = subprocess.check_output(cmd, text=True).strip() + self._total_memory = float(output) + return True +except (FileNotFoundError, subprocess.CalledProcessError): + logger.warning( + "nvidia-smi not found or failed. Defaulting total memor
Re: [PR] Add model manager that automatically manage model across processes [beam]
damccorm commented on code in PR #37113: URL: https://github.com/apache/beam/pull/37113#discussion_r2687842957 ## sdks/python/apache_beam/ml/inference/model_manager.py: ## @@ -0,0 +1,669 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Module for managing ML models in Apache Beam pipelines. + +This module provides classes and functions to efficiently manage multiple +machine learning models within Apache Beam pipelines. It includes functionality +for loading, caching, and updating models using multi-process shared memory, +ensuring that models are reused across different workers to optimize resource +usage and performance. +""" + +import uuid +import time +import threading +import subprocess +import logging +import gc +import numpy as np +from scipy.optimize import nnls +import torch +import heapq +import itertools +from collections import defaultdict, deque, Counter, OrderedDict +from typing import Dict, Any, Tuple, Optional, Callable + +logger = logging.getLogger(__name__) + + +class GPUMonitor: + def __init__( + self, + fallback_memory_mb: float = 16000.0, + poll_interval: float = 0.5, + peak_window_seconds: float = 30.0): +self._current_usage = 0.0 +self._peak_usage = 0.0 +self._total_memory = fallback_memory_mb +self._poll_interval = poll_interval +self._peak_window_seconds = peak_window_seconds +self._memory_history = deque() +self._running = False +self._thread = None +self._lock = threading.Lock() +self._gpu_available = self._detect_hardware() Review Comment: I think it would be better to defer this logic to the first time we call `start`. That will ensure that if this object gets created at job submission, we still look in the execution environment for the GPU ## sdks/python/apache_beam/ml/inference/model_manager.py: ## @@ -0,0 +1,669 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Module for managing ML models in Apache Beam pipelines. + +This module provides classes and functions to efficiently manage multiple +machine learning models within Apache Beam pipelines. It includes functionality +for loading, caching, and updating models using multi-process shared memory, +ensuring that models are reused across different workers to optimize resource +usage and performance. +""" + +import uuid +import time +import threading +import subprocess +import logging +import gc +import numpy as np +from scipy.optimize import nnls +import torch +import heapq +import itertools +from collections import defaultdict, deque, Counter, OrderedDict +from typing import Dict, Any, Tuple, Optional, Callable + +logger = logging.getLogger(__name__) + + +class GPUMonitor: + def __init__( + self, + fallback_memory_mb: float = 16000.0, + poll_interval: float = 0.5, + peak_window_seconds: float = 30.0): +self._current_usage = 0.0 +self._peak_usage = 0.0 +self._total_memory = fallback_memory_mb +self._poll_interval = poll_interval +self._peak_window_seconds = peak_window_seconds +self._memory_history = deque() +self._running = False +self._thread = None +self._lock = threading.Lock() +self._gpu_available = self._detect_hardware() + + def _detect_hardware(self): +try: + cmd = [ + "nvidia-smi", + "--query-gpu=memory.total", + "--format=csv,noheader,nounits" + ] + output = subprocess.check_output(cmd, text=True).strip() + self
Re: [PR] Add model manager that automatically manage model across processes [beam]
github-actions[bot] commented on PR #37113: URL: https://github.com/apache/beam/pull/37113#issuecomment-3657783339 Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment `assign set of reviewers` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Add model manager that automatically manage model across processes [beam]
AMOOOMA commented on PR #37113: URL: https://github.com/apache/beam/pull/37113#issuecomment-3657768250 R: @damccorm -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Add model manager that automatically manage model across processes [beam]
gemini-code-assist[bot] commented on PR #37113: URL: https://github.com/apache/beam/pull/37113#issuecomment-3657768014 ## Summary of Changes Hello @AMOOOMA, I'm Gemini Code Assist[^1]! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a sophisticated model management system for Apache Beam's ML inference capabilities. The core "ModelManager" class, supported by "GPUMonitor" and "ResourceEstimator", intelligently handles the lifecycle of machine learning models, particularly on GPU-accelerated environments. It aims to prevent out-of-memory errors by dynamically estimating model memory requirements, isolating unknown models for profiling, and implementing a demand-aware eviction strategy. This system ensures efficient and concurrent execution of diverse ML models within Beam pipelines, optimizing GPU resource utilization and improving overall stability. ### Highlights * **ModelManager Introduction**: A new "ModelManager" class is added to provide managed access to ML models, handling GPU OOMs and optimizing resource usage. * **GPUMonitor for Memory Tracking**: Implements "GPUMonitor" to continuously poll and track GPU memory usage, including current, peak, and total memory. * **ResourceEstimator for Cost Estimation**: Introduces "ResourceEstimator" which uses Non-Negative Least Squares (NNLS) to dynamically estimate the memory cost of models and adapt to fluctuating usage. * **Intelligent Model Eviction**: The "ModelManager" includes an eviction strategy that prioritizes models based on demand, age, and surplus copies to free up GPU memory when needed. * **Isolation Mode for Unknown Models**: Unknown models are loaded in an isolated environment to accurately profile their memory footprint without affecting other active models. * **Comprehensive Testing**: New unit tests are added to validate the functionality of "GPUMonitor", "ResourceEstimator", and "ModelManager", covering capacity checks, isolation, concurrent execution, OOM recovery, and eviction logic. Using Gemini Code Assist The full guide for Gemini Code Assist can be found on our [documentation page](https://developers.google.com/gemini-code-assist/docs/review-github-code), here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either `/gemini ` or `@gemini-code-assist `. Below is a summary of the supported commands on the current page. Feature | Command | Description --- | --- | --- Code Review | `/gemini review` | Performs a code review for the current pull request in its current state. Pull Request Summary | `/gemini summary` | Provides a summary of the current pull request in its current state. Comment | @gemini-code-assist | Responds in comments when explicitly tagged, both in pull request comments and review comments. Help | `/gemini help` | Displays a list of available commands. Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a `.gemini/` folder in the base of the repository. Detailed instructions can be found [here](https://developers.google.com/gemini-code-assist/docs/customize-gemini-behavior-github). Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with :thumbsup: and :thumbsdown: on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up [here](https://google.qualtrics.com/jfe/form/SV_2cyuGuTWsEw84yG). You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the [Gemini Code Assist IDE Extension](https://cloud.google.com/products/gemini/code-assist). [^1]: Review the [Privacy Notices](https://policies.google.com/privacy), [Generative AI Prohibited Use Policy](https://policies.google.com/terms/generative-ai/use-policy), [Terms of Service](https://policies.google.com/terms), and learn how to configure Gemini Code Assist in GitHub [here](https://developers.google.com/gemini-code-assist/docs/customize-gemini-behavior-github). Gemini can make mistakes, so double check it and [use code with caution](https://support.google.com/legal/answer/13505487). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail
