Re: [PR] [QDP] Numpy Input Speed & Memory Improvements [mahout]
400Ping closed pull request #877: [QDP] Numpy Input Speed & Memory Improvements URL: https://github.com/apache/mahout/pull/877 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] [QDP] Numpy Input Speed & Memory Improvements [mahout]
400Ping commented on PR #877: URL: https://github.com/apache/mahout/pull/877#issuecomment-3773382500 > You might want to also monitor mem spike as it your primary goal and if it really matters because it's on cpu and ram can be offload to ssd. (I'm not sure if it's possible tho. I'm not familiar with offloading strategy. offloading will decrease the speed because another io bound) Just checked the mem spike, it isn't better ``` main:Maximum resident set size 838,872 kB (~819.3 MiB) PR:Maximum resident set size 838,716 kB (~819.1 MiB) ``` Closing this PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] [QDP] Numpy Input Speed & Memory Improvements [mahout]
ryankert01 commented on PR #877: URL: https://github.com/apache/mahout/pull/877#issuecomment-3773287101 You might want to also monitor mem spike as it your primary goal and if it really matters because it's on cpu and ram can be offload to ssd. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] [QDP] Numpy Input Speed & Memory Improvements [mahout]
400Ping commented on PR #877: URL: https://github.com/apache/mahout/pull/877#issuecomment-3773246333 The improvements isn't a lot better, should I close this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] [QDP] Numpy Input Speed & Memory Improvements [mahout]
400Ping commented on PR #877: URL: https://github.com/apache/mahout/pull/877#issuecomment-3773241275 Before: ``` == NUMPY I/O + ENCODING BENCHMARK == Qubits: 10 Sample size: 1024 elements Number of samples: 1 Total data: 78.12 MB Frameworks: mahout, pennylane Generating test data... Saving to /tmp/tmp0rf7j7p6.npy... File size: 78.13 MB [Mahout + NumPy] Loading and encoding... Total Time (I/O + Encode): 0.0581 s Throughput: 172165.2 samples/sec Average per sample: 0.01 ms [PennyLane + NumPy] Loading and encoding... Total Time (I/O + Encode): 2.8481 s Throughput: 3511.2 samples/sec Average per sample: 0.28 ms == SUMMARY == Framework Time (s) Throughput Avg/Sample -- Mahout 0.0581 172165.2 0.01 PennyLane 2.8481 3511.2 0.28 -- SPEEDUP COMPARISON -- Mahout vs PennyLane: 49.03x Time reduction: 49.03x faster Cleaned up temporary file: /tmp/tmp0rf7j7p6.npy == BENCHMARK COMPLETE == ``` After: ``` == NUMPY I/O + ENCODING BENCHMARK == Qubits: 10 Sample size: 1024 elements Number of samples: 1 Total data: 78.12 MB Frameworks: mahout, pennylane Generating test data... Saving to /tmp/tmpq898jgel.npy... File size: 78.13 MB [Mahout + NumPy] Loading and encoding... Total Time (I/O + Encode): 0.0574 s Throughput: 174195.3 samples/sec Average per sample: 0.01 ms [PennyLane + NumPy] Loading and encoding... Total Time (I/O + Encode): 2.8587 s Throughput: 3498.1 samples/sec Average per sample: 0.29 ms == SUMMARY == Framework Time (s) Throughput Avg/Sample -- Mahout 0.0574 174195.3 0.01 PennyLane 2.8587 3498.1 0.29 -- SPEEDUP COMPARISON -- Mahout vs PennyLane: 49.80x Time reduction: 49.80x faster Cleaned up temporary file: /tmp/tmpq898jgel.npy == BENCHMARK COMPLETE == ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] [QDP] Numpy Input Speed & Memory Improvements [mahout]
viiccwen commented on code in PR #877:
URL: https://github.com/apache/mahout/pull/877#discussion_r2708631447
##
qdp/qdp-core/src/readers/numpy.rs:
##
@@ -155,9 +428,260 @@ impl DataReader for NumpyReader {
}
}
+/// Streaming reader for NumPy `.npy` files containing 2D float64 arrays.
+///
+/// Reads data in chunks without loading the entire file into memory.
+pub struct NumpyStreamingReader {
+file: File,
+header: NpyHeader,
+row_cursor: usize,
+column_buf: Vec,
+}
+
+impl NumpyStreamingReader {
+/// Create a new streaming NumPy reader.
+pub fn new>(path: P) -> Result {
+let path = path.as_ref();
+
+match path.try_exists() {
+Ok(false) => {
+return Err(MahoutError::Io(format!(
+"NumPy file not found: {}",
+path.display()
+)));
+}
+Err(e) => {
+return Err(MahoutError::Io(format!(
+"Failed to check if NumPy file exists at {}: {}",
+path.display(),
+e
+)));
+}
+Ok(true) => {}
+}
+
+let mut file = File::open(path)
+.map_err(|e| MahoutError::Io(format!("Failed to open NumPy file:
{}", e)))?;
+let header = read_npy_header(path, &mut file)?;
+
+Ok(Self {
+file,
+header,
+row_cursor: 0,
+column_buf: Vec::new(),
+})
+}
+}
+
+impl DataReader for NumpyStreamingReader {
+fn read_batch(&mut self) -> Result<(Vec, usize, usize)> {
+let total_elements = self.header.total_elements();
+let mut data = vec![0.0; total_elements];
+let mut written = 0;
+while written < total_elements {
+let n = self.read_chunk(&mut data[written..])?;
+if n == 0 {
+break;
+}
+written += n;
+}
+if written != total_elements {
+data.truncate(written);
+}
+
+Ok((data, self.header.num_samples, self.header.sample_size))
+}
+
+fn get_sample_size(&self) -> Option {
+Some(self.header.sample_size)
+}
+
+fn get_num_samples(&self) -> Option {
+Some(self.header.num_samples)
+}
+}
+
+impl StreamingDataReader for NumpyStreamingReader {
+fn read_chunk(&mut self, buffer: &mut [f64]) -> Result {
+if self.row_cursor >= self.header.num_samples {
+return Ok(0);
+}
+
+let sample_size = self.header.sample_size;
+let max_rows = buffer.len() / sample_size;
+if max_rows == 0 {
+return Err(MahoutError::InvalidInput(format!(
+"Buffer too small for one sample (need {} elements)",
+sample_size
+)));
+}
+
+let remaining_rows = self.header.num_samples - self.row_cursor;
+let rows_to_read = std::cmp::min(max_rows, remaining_rows);
+let elem_count = rows_to_read * sample_size;
+
+if !self.header.fortran_order {
+let offset = self.header.data_offset
++ (self.row_cursor * sample_size * std::mem::size_of::())
as u64;
+read_f64s_at(&mut self.file, offset, &mut buffer[..elem_count])?;
+} else {
+if self.column_buf.len() < rows_to_read {
+self.column_buf.resize(rows_to_read, 0.0);
+}
+for col in 0..sample_size {
+let offset = self.header.data_offset
++ ((col * self.header.num_samples + self.row_cursor)
+* std::mem::size_of::()) as u64;
+let column = &mut self.column_buf[..rows_to_read];
+read_f64s_at(&mut self.file, offset, column)?;
+for row in 0..rows_to_read {
+buffer[row * sample_size + col] = column[row];
+}
+}
+}
+
+self.row_cursor += rows_to_read;
+Ok(elem_count)
+}
+
+fn total_rows(&self) -> usize {
+self.header.num_samples
+}
+}
+
+/// Memory-mapped reader for NumPy `.npy` files containing 2D float64 arrays.
+///
+/// Maps the file into memory and streams slices without an extra read +
flatten pass.
+pub struct NumpyMmapReader {
+mmap: Mmap,
+header: NpyHeader,
+row_cursor: usize,
+column_buf: Vec,
+}
+
+impl NumpyMmapReader {
+/// Create a new memory-mapped NumPy reader.
+pub fn new>(path: P) -> Result {
+let path = path.as_ref();
+
+match path.try_exists() {
+Ok(false) => {
+return Err(MahoutError::Io(format!(
+"NumPy file not found: {}",
+path.display()
+)));
+}
+Err(e) => {
+return Err(MahoutError::Io(format!(
+"Failed to check if NumPy file exists at {}: {}",
+
Re: [PR] [QDP] Numpy Input Speed & Memory Improvements [mahout]
400Ping commented on code in PR #877:
URL: https://github.com/apache/mahout/pull/877#discussion_r2708623801
##
qdp/qdp-core/src/readers/numpy.rs:
##
@@ -155,9 +428,260 @@ impl DataReader for NumpyReader {
}
}
+/// Streaming reader for NumPy `.npy` files containing 2D float64 arrays.
+///
+/// Reads data in chunks without loading the entire file into memory.
+pub struct NumpyStreamingReader {
+file: File,
+header: NpyHeader,
+row_cursor: usize,
+column_buf: Vec,
+}
+
+impl NumpyStreamingReader {
+/// Create a new streaming NumPy reader.
+pub fn new>(path: P) -> Result {
+let path = path.as_ref();
+
+match path.try_exists() {
+Ok(false) => {
+return Err(MahoutError::Io(format!(
+"NumPy file not found: {}",
+path.display()
+)));
+}
+Err(e) => {
+return Err(MahoutError::Io(format!(
+"Failed to check if NumPy file exists at {}: {}",
+path.display(),
+e
+)));
+}
+Ok(true) => {}
+}
+
+let mut file = File::open(path)
+.map_err(|e| MahoutError::Io(format!("Failed to open NumPy file:
{}", e)))?;
+let header = read_npy_header(path, &mut file)?;
+
+Ok(Self {
+file,
+header,
+row_cursor: 0,
+column_buf: Vec::new(),
+})
+}
+}
+
+impl DataReader for NumpyStreamingReader {
+fn read_batch(&mut self) -> Result<(Vec, usize, usize)> {
+let total_elements = self.header.total_elements();
+let mut data = vec![0.0; total_elements];
+let mut written = 0;
+while written < total_elements {
+let n = self.read_chunk(&mut data[written..])?;
+if n == 0 {
+break;
+}
+written += n;
+}
+if written != total_elements {
+data.truncate(written);
+}
+
+Ok((data, self.header.num_samples, self.header.sample_size))
+}
+
+fn get_sample_size(&self) -> Option {
+Some(self.header.sample_size)
+}
+
+fn get_num_samples(&self) -> Option {
+Some(self.header.num_samples)
+}
+}
+
+impl StreamingDataReader for NumpyStreamingReader {
+fn read_chunk(&mut self, buffer: &mut [f64]) -> Result {
+if self.row_cursor >= self.header.num_samples {
+return Ok(0);
+}
+
+let sample_size = self.header.sample_size;
+let max_rows = buffer.len() / sample_size;
+if max_rows == 0 {
+return Err(MahoutError::InvalidInput(format!(
+"Buffer too small for one sample (need {} elements)",
+sample_size
+)));
+}
+
+let remaining_rows = self.header.num_samples - self.row_cursor;
+let rows_to_read = std::cmp::min(max_rows, remaining_rows);
+let elem_count = rows_to_read * sample_size;
+
+if !self.header.fortran_order {
+let offset = self.header.data_offset
++ (self.row_cursor * sample_size * std::mem::size_of::())
as u64;
+read_f64s_at(&mut self.file, offset, &mut buffer[..elem_count])?;
+} else {
+if self.column_buf.len() < rows_to_read {
+self.column_buf.resize(rows_to_read, 0.0);
+}
+for col in 0..sample_size {
+let offset = self.header.data_offset
++ ((col * self.header.num_samples + self.row_cursor)
+* std::mem::size_of::()) as u64;
+let column = &mut self.column_buf[..rows_to_read];
+read_f64s_at(&mut self.file, offset, column)?;
+for row in 0..rows_to_read {
+buffer[row * sample_size + col] = column[row];
+}
+}
+}
+
+self.row_cursor += rows_to_read;
+Ok(elem_count)
+}
+
+fn total_rows(&self) -> usize {
+self.header.num_samples
+}
+}
+
+/// Memory-mapped reader for NumPy `.npy` files containing 2D float64 arrays.
+///
+/// Maps the file into memory and streams slices without an extra read +
flatten pass.
+pub struct NumpyMmapReader {
+mmap: Mmap,
+header: NpyHeader,
+row_cursor: usize,
+column_buf: Vec,
+}
+
+impl NumpyMmapReader {
+/// Create a new memory-mapped NumPy reader.
+pub fn new>(path: P) -> Result {
+let path = path.as_ref();
+
+match path.try_exists() {
+Ok(false) => {
+return Err(MahoutError::Io(format!(
+"NumPy file not found: {}",
+path.display()
+)));
+}
+Err(e) => {
+return Err(MahoutError::Io(format!(
+"Failed to check if NumPy file exists at {}: {}",
+
Re: [PR] [QDP] Numpy Input Speed & Memory Improvements [mahout]
400Ping commented on code in PR #877:
URL: https://github.com/apache/mahout/pull/877#discussion_r2708620303
##
qdp/qdp-core/src/readers/numpy.rs:
##
@@ -155,9 +428,260 @@ impl DataReader for NumpyReader {
}
}
+/// Streaming reader for NumPy `.npy` files containing 2D float64 arrays.
+///
+/// Reads data in chunks without loading the entire file into memory.
+pub struct NumpyStreamingReader {
+file: File,
+header: NpyHeader,
+row_cursor: usize,
+column_buf: Vec,
+}
+
+impl NumpyStreamingReader {
+/// Create a new streaming NumPy reader.
+pub fn new>(path: P) -> Result {
+let path = path.as_ref();
+
+match path.try_exists() {
+Ok(false) => {
+return Err(MahoutError::Io(format!(
+"NumPy file not found: {}",
+path.display()
+)));
+}
+Err(e) => {
+return Err(MahoutError::Io(format!(
+"Failed to check if NumPy file exists at {}: {}",
+path.display(),
+e
+)));
+}
+Ok(true) => {}
+}
+
+let mut file = File::open(path)
+.map_err(|e| MahoutError::Io(format!("Failed to open NumPy file:
{}", e)))?;
+let header = read_npy_header(path, &mut file)?;
+
+Ok(Self {
+file,
+header,
+row_cursor: 0,
+column_buf: Vec::new(),
+})
+}
+}
+
+impl DataReader for NumpyStreamingReader {
+fn read_batch(&mut self) -> Result<(Vec, usize, usize)> {
+let total_elements = self.header.total_elements();
+let mut data = vec![0.0; total_elements];
+let mut written = 0;
+while written < total_elements {
+let n = self.read_chunk(&mut data[written..])?;
+if n == 0 {
+break;
+}
+written += n;
+}
+if written != total_elements {
+data.truncate(written);
+}
+
+Ok((data, self.header.num_samples, self.header.sample_size))
+}
+
+fn get_sample_size(&self) -> Option {
+Some(self.header.sample_size)
+}
+
+fn get_num_samples(&self) -> Option {
+Some(self.header.num_samples)
+}
+}
+
+impl StreamingDataReader for NumpyStreamingReader {
+fn read_chunk(&mut self, buffer: &mut [f64]) -> Result {
+if self.row_cursor >= self.header.num_samples {
+return Ok(0);
+}
+
+let sample_size = self.header.sample_size;
+let max_rows = buffer.len() / sample_size;
+if max_rows == 0 {
+return Err(MahoutError::InvalidInput(format!(
+"Buffer too small for one sample (need {} elements)",
+sample_size
+)));
+}
+
+let remaining_rows = self.header.num_samples - self.row_cursor;
+let rows_to_read = std::cmp::min(max_rows, remaining_rows);
+let elem_count = rows_to_read * sample_size;
+
+if !self.header.fortran_order {
+let offset = self.header.data_offset
++ (self.row_cursor * sample_size * std::mem::size_of::())
as u64;
+read_f64s_at(&mut self.file, offset, &mut buffer[..elem_count])?;
+} else {
+if self.column_buf.len() < rows_to_read {
+self.column_buf.resize(rows_to_read, 0.0);
+}
+for col in 0..sample_size {
+let offset = self.header.data_offset
++ ((col * self.header.num_samples + self.row_cursor)
+* std::mem::size_of::()) as u64;
+let column = &mut self.column_buf[..rows_to_read];
+read_f64s_at(&mut self.file, offset, column)?;
+for row in 0..rows_to_read {
+buffer[row * sample_size + col] = column[row];
+}
+}
+}
+
+self.row_cursor += rows_to_read;
+Ok(elem_count)
+}
+
+fn total_rows(&self) -> usize {
+self.header.num_samples
+}
+}
+
+/// Memory-mapped reader for NumPy `.npy` files containing 2D float64 arrays.
+///
+/// Maps the file into memory and streams slices without an extra read +
flatten pass.
+pub struct NumpyMmapReader {
+mmap: Mmap,
+header: NpyHeader,
+row_cursor: usize,
+column_buf: Vec,
+}
+
+impl NumpyMmapReader {
+/// Create a new memory-mapped NumPy reader.
+pub fn new>(path: P) -> Result {
+let path = path.as_ref();
+
+match path.try_exists() {
+Ok(false) => {
+return Err(MahoutError::Io(format!(
+"NumPy file not found: {}",
+path.display()
+)));
+}
+Err(e) => {
+return Err(MahoutError::Io(format!(
+"Failed to check if NumPy file exists at {}: {}",
+
Re: [PR] [QDP] Numpy Input Speed & Memory Improvements [mahout]
400Ping commented on code in PR #877:
URL: https://github.com/apache/mahout/pull/877#discussion_r2708620303
##
qdp/qdp-core/src/readers/numpy.rs:
##
@@ -155,9 +428,260 @@ impl DataReader for NumpyReader {
}
}
+/// Streaming reader for NumPy `.npy` files containing 2D float64 arrays.
+///
+/// Reads data in chunks without loading the entire file into memory.
+pub struct NumpyStreamingReader {
+file: File,
+header: NpyHeader,
+row_cursor: usize,
+column_buf: Vec,
+}
+
+impl NumpyStreamingReader {
+/// Create a new streaming NumPy reader.
+pub fn new>(path: P) -> Result {
+let path = path.as_ref();
+
+match path.try_exists() {
+Ok(false) => {
+return Err(MahoutError::Io(format!(
+"NumPy file not found: {}",
+path.display()
+)));
+}
+Err(e) => {
+return Err(MahoutError::Io(format!(
+"Failed to check if NumPy file exists at {}: {}",
+path.display(),
+e
+)));
+}
+Ok(true) => {}
+}
+
+let mut file = File::open(path)
+.map_err(|e| MahoutError::Io(format!("Failed to open NumPy file:
{}", e)))?;
+let header = read_npy_header(path, &mut file)?;
+
+Ok(Self {
+file,
+header,
+row_cursor: 0,
+column_buf: Vec::new(),
+})
+}
+}
+
+impl DataReader for NumpyStreamingReader {
+fn read_batch(&mut self) -> Result<(Vec, usize, usize)> {
+let total_elements = self.header.total_elements();
+let mut data = vec![0.0; total_elements];
+let mut written = 0;
+while written < total_elements {
+let n = self.read_chunk(&mut data[written..])?;
+if n == 0 {
+break;
+}
+written += n;
+}
+if written != total_elements {
+data.truncate(written);
+}
+
+Ok((data, self.header.num_samples, self.header.sample_size))
+}
+
+fn get_sample_size(&self) -> Option {
+Some(self.header.sample_size)
+}
+
+fn get_num_samples(&self) -> Option {
+Some(self.header.num_samples)
+}
+}
+
+impl StreamingDataReader for NumpyStreamingReader {
+fn read_chunk(&mut self, buffer: &mut [f64]) -> Result {
+if self.row_cursor >= self.header.num_samples {
+return Ok(0);
+}
+
+let sample_size = self.header.sample_size;
+let max_rows = buffer.len() / sample_size;
+if max_rows == 0 {
+return Err(MahoutError::InvalidInput(format!(
+"Buffer too small for one sample (need {} elements)",
+sample_size
+)));
+}
+
+let remaining_rows = self.header.num_samples - self.row_cursor;
+let rows_to_read = std::cmp::min(max_rows, remaining_rows);
+let elem_count = rows_to_read * sample_size;
+
+if !self.header.fortran_order {
+let offset = self.header.data_offset
++ (self.row_cursor * sample_size * std::mem::size_of::())
as u64;
+read_f64s_at(&mut self.file, offset, &mut buffer[..elem_count])?;
+} else {
+if self.column_buf.len() < rows_to_read {
+self.column_buf.resize(rows_to_read, 0.0);
+}
+for col in 0..sample_size {
+let offset = self.header.data_offset
++ ((col * self.header.num_samples + self.row_cursor)
+* std::mem::size_of::()) as u64;
+let column = &mut self.column_buf[..rows_to_read];
+read_f64s_at(&mut self.file, offset, column)?;
+for row in 0..rows_to_read {
+buffer[row * sample_size + col] = column[row];
+}
+}
+}
+
+self.row_cursor += rows_to_read;
+Ok(elem_count)
+}
+
+fn total_rows(&self) -> usize {
+self.header.num_samples
+}
+}
+
+/// Memory-mapped reader for NumPy `.npy` files containing 2D float64 arrays.
+///
+/// Maps the file into memory and streams slices without an extra read +
flatten pass.
+pub struct NumpyMmapReader {
+mmap: Mmap,
+header: NpyHeader,
+row_cursor: usize,
+column_buf: Vec,
+}
+
+impl NumpyMmapReader {
+/// Create a new memory-mapped NumPy reader.
+pub fn new>(path: P) -> Result {
+let path = path.as_ref();
+
+match path.try_exists() {
+Ok(false) => {
+return Err(MahoutError::Io(format!(
+"NumPy file not found: {}",
+path.display()
+)));
+}
+Err(e) => {
+return Err(MahoutError::Io(format!(
+"Failed to check if NumPy file exists at {}: {}",
+
Re: [PR] [QDP] Numpy Input Speed & Memory Improvements [mahout]
viiccwen commented on code in PR #877:
URL: https://github.com/apache/mahout/pull/877#discussion_r2708586279
##
qdp/qdp-core/src/readers/numpy.rs:
##
@@ -155,9 +428,260 @@ impl DataReader for NumpyReader {
}
}
+/// Streaming reader for NumPy `.npy` files containing 2D float64 arrays.
+///
+/// Reads data in chunks without loading the entire file into memory.
+pub struct NumpyStreamingReader {
+file: File,
+header: NpyHeader,
+row_cursor: usize,
+column_buf: Vec,
+}
+
+impl NumpyStreamingReader {
+/// Create a new streaming NumPy reader.
+pub fn new>(path: P) -> Result {
+let path = path.as_ref();
+
+match path.try_exists() {
+Ok(false) => {
+return Err(MahoutError::Io(format!(
+"NumPy file not found: {}",
+path.display()
+)));
+}
+Err(e) => {
+return Err(MahoutError::Io(format!(
+"Failed to check if NumPy file exists at {}: {}",
+path.display(),
+e
+)));
+}
+Ok(true) => {}
+}
+
+let mut file = File::open(path)
+.map_err(|e| MahoutError::Io(format!("Failed to open NumPy file:
{}", e)))?;
+let header = read_npy_header(path, &mut file)?;
+
+Ok(Self {
+file,
+header,
+row_cursor: 0,
+column_buf: Vec::new(),
+})
+}
+}
+
+impl DataReader for NumpyStreamingReader {
+fn read_batch(&mut self) -> Result<(Vec, usize, usize)> {
+let total_elements = self.header.total_elements();
+let mut data = vec![0.0; total_elements];
+let mut written = 0;
+while written < total_elements {
+let n = self.read_chunk(&mut data[written..])?;
+if n == 0 {
+break;
+}
+written += n;
+}
+if written != total_elements {
+data.truncate(written);
+}
+
+Ok((data, self.header.num_samples, self.header.sample_size))
+}
+
+fn get_sample_size(&self) -> Option {
+Some(self.header.sample_size)
+}
+
+fn get_num_samples(&self) -> Option {
+Some(self.header.num_samples)
+}
+}
+
+impl StreamingDataReader for NumpyStreamingReader {
+fn read_chunk(&mut self, buffer: &mut [f64]) -> Result {
+if self.row_cursor >= self.header.num_samples {
+return Ok(0);
+}
+
+let sample_size = self.header.sample_size;
+let max_rows = buffer.len() / sample_size;
+if max_rows == 0 {
+return Err(MahoutError::InvalidInput(format!(
+"Buffer too small for one sample (need {} elements)",
+sample_size
+)));
+}
+
+let remaining_rows = self.header.num_samples - self.row_cursor;
+let rows_to_read = std::cmp::min(max_rows, remaining_rows);
+let elem_count = rows_to_read * sample_size;
+
+if !self.header.fortran_order {
+let offset = self.header.data_offset
++ (self.row_cursor * sample_size * std::mem::size_of::())
as u64;
+read_f64s_at(&mut self.file, offset, &mut buffer[..elem_count])?;
+} else {
+if self.column_buf.len() < rows_to_read {
+self.column_buf.resize(rows_to_read, 0.0);
+}
+for col in 0..sample_size {
+let offset = self.header.data_offset
++ ((col * self.header.num_samples + self.row_cursor)
+* std::mem::size_of::()) as u64;
+let column = &mut self.column_buf[..rows_to_read];
+read_f64s_at(&mut self.file, offset, column)?;
+for row in 0..rows_to_read {
+buffer[row * sample_size + col] = column[row];
+}
+}
+}
+
+self.row_cursor += rows_to_read;
+Ok(elem_count)
+}
+
+fn total_rows(&self) -> usize {
+self.header.num_samples
+}
+}
+
+/// Memory-mapped reader for NumPy `.npy` files containing 2D float64 arrays.
+///
+/// Maps the file into memory and streams slices without an extra read +
flatten pass.
+pub struct NumpyMmapReader {
+mmap: Mmap,
+header: NpyHeader,
+row_cursor: usize,
+column_buf: Vec,
+}
+
+impl NumpyMmapReader {
+/// Create a new memory-mapped NumPy reader.
+pub fn new>(path: P) -> Result {
+let path = path.as_ref();
+
+match path.try_exists() {
+Ok(false) => {
+return Err(MahoutError::Io(format!(
+"NumPy file not found: {}",
+path.display()
+)));
+}
+Err(e) => {
+return Err(MahoutError::Io(format!(
+"Failed to check if NumPy file exists at {}: {}",
+
Re: [PR] [QDP] Numpy Input Speed & Memory Improvements [mahout]
ryankert01 commented on PR #877: URL: https://github.com/apache/mahout/pull/877#issuecomment-3772428139 Could you provide a benchmark before and after result if any? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] [QDP] Numpy Input Speed & Memory Improvements [mahout]
400Ping commented on code in PR #877:
URL: https://github.com/apache/mahout/pull/877#discussion_r2707892631
##
qdp/qdp-core/src/readers/numpy.rs:
##
@@ -155,9 +428,260 @@ impl DataReader for NumpyReader {
}
}
+/// Streaming reader for NumPy `.npy` files containing 2D float64 arrays.
+///
+/// Reads data in chunks without loading the entire file into memory.
+pub struct NumpyStreamingReader {
+file: File,
+header: NpyHeader,
+row_cursor: usize,
+column_buf: Vec,
+}
+
+impl NumpyStreamingReader {
+/// Create a new streaming NumPy reader.
+pub fn new>(path: P) -> Result {
+let path = path.as_ref();
+
+match path.try_exists() {
+Ok(false) => {
+return Err(MahoutError::Io(format!(
+"NumPy file not found: {}",
+path.display()
+)));
+}
+Err(e) => {
+return Err(MahoutError::Io(format!(
+"Failed to check if NumPy file exists at {}: {}",
+path.display(),
+e
+)));
+}
+Ok(true) => {}
+}
+
+let mut file = File::open(path)
+.map_err(|e| MahoutError::Io(format!("Failed to open NumPy file:
{}", e)))?;
+let header = read_npy_header(path, &mut file)?;
+
+Ok(Self {
+file,
+header,
+row_cursor: 0,
+column_buf: Vec::new(),
+})
+}
+}
+
+impl DataReader for NumpyStreamingReader {
+fn read_batch(&mut self) -> Result<(Vec, usize, usize)> {
+let total_elements = self.header.total_elements();
+let mut data = vec![0.0; total_elements];
+let mut written = 0;
+while written < total_elements {
+let n = self.read_chunk(&mut data[written..])?;
+if n == 0 {
+break;
+}
+written += n;
+}
+if written != total_elements {
+data.truncate(written);
+}
+
+Ok((data, self.header.num_samples, self.header.sample_size))
+}
+
+fn get_sample_size(&self) -> Option {
+Some(self.header.sample_size)
+}
+
+fn get_num_samples(&self) -> Option {
+Some(self.header.num_samples)
+}
+}
+
+impl StreamingDataReader for NumpyStreamingReader {
+fn read_chunk(&mut self, buffer: &mut [f64]) -> Result {
+if self.row_cursor >= self.header.num_samples {
+return Ok(0);
+}
+
+let sample_size = self.header.sample_size;
+let max_rows = buffer.len() / sample_size;
+if max_rows == 0 {
+return Err(MahoutError::InvalidInput(format!(
+"Buffer too small for one sample (need {} elements)",
+sample_size
+)));
+}
+
+let remaining_rows = self.header.num_samples - self.row_cursor;
+let rows_to_read = std::cmp::min(max_rows, remaining_rows);
+let elem_count = rows_to_read * sample_size;
+
+if !self.header.fortran_order {
+let offset = self.header.data_offset
++ (self.row_cursor * sample_size * std::mem::size_of::())
as u64;
+read_f64s_at(&mut self.file, offset, &mut buffer[..elem_count])?;
+} else {
+if self.column_buf.len() < rows_to_read {
+self.column_buf.resize(rows_to_read, 0.0);
+}
+for col in 0..sample_size {
+let offset = self.header.data_offset
++ ((col * self.header.num_samples + self.row_cursor)
+* std::mem::size_of::()) as u64;
+let column = &mut self.column_buf[..rows_to_read];
+read_f64s_at(&mut self.file, offset, column)?;
+for row in 0..rows_to_read {
+buffer[row * sample_size + col] = column[row];
+}
+}
+}
+
+self.row_cursor += rows_to_read;
+Ok(elem_count)
+}
+
+fn total_rows(&self) -> usize {
+self.header.num_samples
+}
+}
+
+/// Memory-mapped reader for NumPy `.npy` files containing 2D float64 arrays.
+///
+/// Maps the file into memory and streams slices without an extra read +
flatten pass.
+pub struct NumpyMmapReader {
+mmap: Mmap,
+header: NpyHeader,
+row_cursor: usize,
+column_buf: Vec,
+}
+
+impl NumpyMmapReader {
+/// Create a new memory-mapped NumPy reader.
+pub fn new>(path: P) -> Result {
+let path = path.as_ref();
+
+match path.try_exists() {
+Ok(false) => {
+return Err(MahoutError::Io(format!(
+"NumPy file not found: {}",
+path.display()
+)));
+}
+Err(e) => {
+return Err(MahoutError::Io(format!(
+"Failed to check if NumPy file exists at {}: {}",
+
Re: [PR] [QDP] Numpy Input Speed & Memory Improvements [mahout]
viiccwen commented on code in PR #877:
URL: https://github.com/apache/mahout/pull/877#discussion_r2707425519
##
qdp/qdp-core/src/readers/numpy.rs:
##
@@ -155,9 +428,260 @@ impl DataReader for NumpyReader {
}
}
+/// Streaming reader for NumPy `.npy` files containing 2D float64 arrays.
+///
+/// Reads data in chunks without loading the entire file into memory.
+pub struct NumpyStreamingReader {
+file: File,
+header: NpyHeader,
+row_cursor: usize,
+column_buf: Vec,
+}
+
+impl NumpyStreamingReader {
+/// Create a new streaming NumPy reader.
+pub fn new>(path: P) -> Result {
+let path = path.as_ref();
+
+match path.try_exists() {
+Ok(false) => {
+return Err(MahoutError::Io(format!(
+"NumPy file not found: {}",
+path.display()
+)));
+}
+Err(e) => {
+return Err(MahoutError::Io(format!(
+"Failed to check if NumPy file exists at {}: {}",
+path.display(),
+e
+)));
+}
+Ok(true) => {}
+}
+
+let mut file = File::open(path)
+.map_err(|e| MahoutError::Io(format!("Failed to open NumPy file:
{}", e)))?;
+let header = read_npy_header(path, &mut file)?;
+
+Ok(Self {
+file,
+header,
+row_cursor: 0,
+column_buf: Vec::new(),
+})
+}
+}
+
+impl DataReader for NumpyStreamingReader {
+fn read_batch(&mut self) -> Result<(Vec, usize, usize)> {
+let total_elements = self.header.total_elements();
+let mut data = vec![0.0; total_elements];
+let mut written = 0;
+while written < total_elements {
+let n = self.read_chunk(&mut data[written..])?;
+if n == 0 {
+break;
+}
+written += n;
+}
+if written != total_elements {
+data.truncate(written);
+}
+
+Ok((data, self.header.num_samples, self.header.sample_size))
+}
+
+fn get_sample_size(&self) -> Option {
+Some(self.header.sample_size)
+}
+
+fn get_num_samples(&self) -> Option {
+Some(self.header.num_samples)
+}
+}
+
+impl StreamingDataReader for NumpyStreamingReader {
+fn read_chunk(&mut self, buffer: &mut [f64]) -> Result {
+if self.row_cursor >= self.header.num_samples {
+return Ok(0);
+}
+
+let sample_size = self.header.sample_size;
+let max_rows = buffer.len() / sample_size;
+if max_rows == 0 {
+return Err(MahoutError::InvalidInput(format!(
+"Buffer too small for one sample (need {} elements)",
+sample_size
+)));
+}
+
+let remaining_rows = self.header.num_samples - self.row_cursor;
+let rows_to_read = std::cmp::min(max_rows, remaining_rows);
+let elem_count = rows_to_read * sample_size;
+
+if !self.header.fortran_order {
+let offset = self.header.data_offset
++ (self.row_cursor * sample_size * std::mem::size_of::())
as u64;
+read_f64s_at(&mut self.file, offset, &mut buffer[..elem_count])?;
+} else {
+if self.column_buf.len() < rows_to_read {
+self.column_buf.resize(rows_to_read, 0.0);
+}
+for col in 0..sample_size {
+let offset = self.header.data_offset
++ ((col * self.header.num_samples + self.row_cursor)
+* std::mem::size_of::()) as u64;
+let column = &mut self.column_buf[..rows_to_read];
+read_f64s_at(&mut self.file, offset, column)?;
+for row in 0..rows_to_read {
+buffer[row * sample_size + col] = column[row];
+}
+}
+}
+
+self.row_cursor += rows_to_read;
+Ok(elem_count)
+}
+
+fn total_rows(&self) -> usize {
+self.header.num_samples
+}
+}
+
+/// Memory-mapped reader for NumPy `.npy` files containing 2D float64 arrays.
+///
+/// Maps the file into memory and streams slices without an extra read +
flatten pass.
+pub struct NumpyMmapReader {
+mmap: Mmap,
+header: NpyHeader,
+row_cursor: usize,
+column_buf: Vec,
+}
+
+impl NumpyMmapReader {
+/// Create a new memory-mapped NumPy reader.
+pub fn new>(path: P) -> Result {
+let path = path.as_ref();
+
+match path.try_exists() {
+Ok(false) => {
+return Err(MahoutError::Io(format!(
+"NumPy file not found: {}",
+path.display()
+)));
+}
+Err(e) => {
+return Err(MahoutError::Io(format!(
+"Failed to check if NumPy file exists at {}: {}",
+
Re: [PR] [QDP] Numpy Input Speed & Memory Improvements [mahout]
viiccwen commented on code in PR #877:
URL: https://github.com/apache/mahout/pull/877#discussion_r2707425519
##
qdp/qdp-core/src/readers/numpy.rs:
##
@@ -155,9 +428,260 @@ impl DataReader for NumpyReader {
}
}
+/// Streaming reader for NumPy `.npy` files containing 2D float64 arrays.
+///
+/// Reads data in chunks without loading the entire file into memory.
+pub struct NumpyStreamingReader {
+file: File,
+header: NpyHeader,
+row_cursor: usize,
+column_buf: Vec,
+}
+
+impl NumpyStreamingReader {
+/// Create a new streaming NumPy reader.
+pub fn new>(path: P) -> Result {
+let path = path.as_ref();
+
+match path.try_exists() {
+Ok(false) => {
+return Err(MahoutError::Io(format!(
+"NumPy file not found: {}",
+path.display()
+)));
+}
+Err(e) => {
+return Err(MahoutError::Io(format!(
+"Failed to check if NumPy file exists at {}: {}",
+path.display(),
+e
+)));
+}
+Ok(true) => {}
+}
+
+let mut file = File::open(path)
+.map_err(|e| MahoutError::Io(format!("Failed to open NumPy file:
{}", e)))?;
+let header = read_npy_header(path, &mut file)?;
+
+Ok(Self {
+file,
+header,
+row_cursor: 0,
+column_buf: Vec::new(),
+})
+}
+}
+
+impl DataReader for NumpyStreamingReader {
+fn read_batch(&mut self) -> Result<(Vec, usize, usize)> {
+let total_elements = self.header.total_elements();
+let mut data = vec![0.0; total_elements];
+let mut written = 0;
+while written < total_elements {
+let n = self.read_chunk(&mut data[written..])?;
+if n == 0 {
+break;
+}
+written += n;
+}
+if written != total_elements {
+data.truncate(written);
+}
+
+Ok((data, self.header.num_samples, self.header.sample_size))
+}
+
+fn get_sample_size(&self) -> Option {
+Some(self.header.sample_size)
+}
+
+fn get_num_samples(&self) -> Option {
+Some(self.header.num_samples)
+}
+}
+
+impl StreamingDataReader for NumpyStreamingReader {
+fn read_chunk(&mut self, buffer: &mut [f64]) -> Result {
+if self.row_cursor >= self.header.num_samples {
+return Ok(0);
+}
+
+let sample_size = self.header.sample_size;
+let max_rows = buffer.len() / sample_size;
+if max_rows == 0 {
+return Err(MahoutError::InvalidInput(format!(
+"Buffer too small for one sample (need {} elements)",
+sample_size
+)));
+}
+
+let remaining_rows = self.header.num_samples - self.row_cursor;
+let rows_to_read = std::cmp::min(max_rows, remaining_rows);
+let elem_count = rows_to_read * sample_size;
+
+if !self.header.fortran_order {
+let offset = self.header.data_offset
++ (self.row_cursor * sample_size * std::mem::size_of::())
as u64;
+read_f64s_at(&mut self.file, offset, &mut buffer[..elem_count])?;
+} else {
+if self.column_buf.len() < rows_to_read {
+self.column_buf.resize(rows_to_read, 0.0);
+}
+for col in 0..sample_size {
+let offset = self.header.data_offset
++ ((col * self.header.num_samples + self.row_cursor)
+* std::mem::size_of::()) as u64;
+let column = &mut self.column_buf[..rows_to_read];
+read_f64s_at(&mut self.file, offset, column)?;
+for row in 0..rows_to_read {
+buffer[row * sample_size + col] = column[row];
+}
+}
+}
+
+self.row_cursor += rows_to_read;
+Ok(elem_count)
+}
+
+fn total_rows(&self) -> usize {
+self.header.num_samples
+}
+}
+
+/// Memory-mapped reader for NumPy `.npy` files containing 2D float64 arrays.
+///
+/// Maps the file into memory and streams slices without an extra read +
flatten pass.
+pub struct NumpyMmapReader {
+mmap: Mmap,
+header: NpyHeader,
+row_cursor: usize,
+column_buf: Vec,
+}
+
+impl NumpyMmapReader {
+/// Create a new memory-mapped NumPy reader.
+pub fn new>(path: P) -> Result {
+let path = path.as_ref();
+
+match path.try_exists() {
+Ok(false) => {
+return Err(MahoutError::Io(format!(
+"NumPy file not found: {}",
+path.display()
+)));
+}
+Err(e) => {
+return Err(MahoutError::Io(format!(
+"Failed to check if NumPy file exists at {}: {}",
+
Re: [PR] [QDP] Numpy Input Speed & Memory Improvements [mahout]
Copilot commented on code in PR #877:
URL: https://github.com/apache/mahout/pull/877#discussion_r2707315470
##
qdp/qdp-core/src/readers/numpy.rs:
##
@@ -155,9 +428,260 @@ impl DataReader for NumpyReader {
}
}
+/// Streaming reader for NumPy `.npy` files containing 2D float64 arrays.
+///
+/// Reads data in chunks without loading the entire file into memory.
+pub struct NumpyStreamingReader {
+file: File,
+header: NpyHeader,
+row_cursor: usize,
+column_buf: Vec,
+}
+
+impl NumpyStreamingReader {
+/// Create a new streaming NumPy reader.
+pub fn new>(path: P) -> Result {
+let path = path.as_ref();
+
+match path.try_exists() {
+Ok(false) => {
+return Err(MahoutError::Io(format!(
+"NumPy file not found: {}",
+path.display()
+)));
+}
+Err(e) => {
+return Err(MahoutError::Io(format!(
+"Failed to check if NumPy file exists at {}: {}",
+path.display(),
+e
+)));
+}
+Ok(true) => {}
+}
+
+let mut file = File::open(path)
+.map_err(|e| MahoutError::Io(format!("Failed to open NumPy file:
{}", e)))?;
+let header = read_npy_header(path, &mut file)?;
+
+Ok(Self {
+file,
+header,
+row_cursor: 0,
+column_buf: Vec::new(),
+})
+}
+}
+
+impl DataReader for NumpyStreamingReader {
+fn read_batch(&mut self) -> Result<(Vec, usize, usize)> {
+let total_elements = self.header.total_elements();
+let mut data = vec![0.0; total_elements];
+let mut written = 0;
+while written < total_elements {
+let n = self.read_chunk(&mut data[written..])?;
+if n == 0 {
+break;
+}
+written += n;
+}
+if written != total_elements {
+data.truncate(written);
+}
+
+Ok((data, self.header.num_samples, self.header.sample_size))
+}
+
+fn get_sample_size(&self) -> Option {
+Some(self.header.sample_size)
+}
+
+fn get_num_samples(&self) -> Option {
+Some(self.header.num_samples)
+}
+}
+
+impl StreamingDataReader for NumpyStreamingReader {
+fn read_chunk(&mut self, buffer: &mut [f64]) -> Result {
+if self.row_cursor >= self.header.num_samples {
+return Ok(0);
+}
+
+let sample_size = self.header.sample_size;
+let max_rows = buffer.len() / sample_size;
+if max_rows == 0 {
+return Err(MahoutError::InvalidInput(format!(
+"Buffer too small for one sample (need {} elements)",
+sample_size
+)));
+}
+
+let remaining_rows = self.header.num_samples - self.row_cursor;
+let rows_to_read = std::cmp::min(max_rows, remaining_rows);
+let elem_count = rows_to_read * sample_size;
+
+if !self.header.fortran_order {
+let offset = self.header.data_offset
++ (self.row_cursor * sample_size * std::mem::size_of::())
as u64;
+read_f64s_at(&mut self.file, offset, &mut buffer[..elem_count])?;
+} else {
+if self.column_buf.len() < rows_to_read {
+self.column_buf.resize(rows_to_read, 0.0);
+}
+for col in 0..sample_size {
+let offset = self.header.data_offset
++ ((col * self.header.num_samples + self.row_cursor)
+* std::mem::size_of::()) as u64;
+let column = &mut self.column_buf[..rows_to_read];
+read_f64s_at(&mut self.file, offset, column)?;
+for row in 0..rows_to_read {
+buffer[row * sample_size + col] = column[row];
+}
+}
Review Comment:
The Fortran-order reading path performs one seek and read per column (line
536), resulting in sample_size seek operations. For wide matrices with many
columns, this could cause significant performance degradation compared to
C-order reading. Consider documenting this performance characteristic in the
struct's documentation, or implementing a batched column read strategy to
reduce the number of seeks.
##
qdp/qdp-core/src/readers/numpy.rs:
##
@@ -18,13 +18,286 @@
//!
//! Provides support for reading .npy files containing 2D float64 arrays.
-use std::path::Path;
+use std::fs::File;
+use std::io::{Read, Seek, SeekFrom};
+use std::path::{Path, PathBuf};
+use memmap2::Mmap;
use ndarray::Array2;
use ndarray_npy::ReadNpyError;
use crate::error::{MahoutError, Result};
-use crate::reader::DataReader;
+use crate::reader::{DataReader, StreamingDataReader};
+
+const NPY_MAGIC: &[u8; 6] = b"\x93NUMPY";
+
+#[derive(Clone, Debug)]
+struct NpyHeader {
+fortran_order
