Re: [PR] [QDP] Numpy Input Speed & Memory Improvements [mahout]

2026-01-20 Thread via GitHub


400Ping closed pull request #877: [QDP] Numpy Input Speed & Memory Improvements
URL: https://github.com/apache/mahout/pull/877


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] [QDP] Numpy Input Speed & Memory Improvements [mahout]

2026-01-20 Thread via GitHub


400Ping commented on PR #877:
URL: https://github.com/apache/mahout/pull/877#issuecomment-3773382500

   > You might want to also monitor mem spike as it your primary goal and if it 
really matters because it's on cpu and ram can be offload to ssd. (I'm not sure 
if it's possible tho. I'm not familiar with offloading strategy. offloading 
will decrease the speed because another io bound)
   
   Just checked the mem spike, it isn't better
   ```
   main:Maximum resident set size 838,872 kB (~819.3 MiB)
   PR:Maximum resident set size 838,716 kB (~819.1 MiB)
   ```
   Closing this PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] [QDP] Numpy Input Speed & Memory Improvements [mahout]

2026-01-20 Thread via GitHub


ryankert01 commented on PR #877:
URL: https://github.com/apache/mahout/pull/877#issuecomment-3773287101

   You might want to also monitor mem spike as it your primary goal and if it 
really matters because it's on cpu and ram can be offload to ssd.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] [QDP] Numpy Input Speed & Memory Improvements [mahout]

2026-01-20 Thread via GitHub


400Ping commented on PR #877:
URL: https://github.com/apache/mahout/pull/877#issuecomment-3773246333

   The improvements isn't a lot better, should I close this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] [QDP] Numpy Input Speed & Memory Improvements [mahout]

2026-01-20 Thread via GitHub


400Ping commented on PR #877:
URL: https://github.com/apache/mahout/pull/877#issuecomment-3773241275

   Before:
   ```
   ==
   NUMPY I/O + ENCODING BENCHMARK
   ==
   Qubits: 10
   Sample size: 1024 elements
   Number of samples: 1
   Total data: 78.12 MB
   Frameworks: mahout, pennylane
   
   Generating test data...
   Saving to /tmp/tmp0rf7j7p6.npy...
   File size: 78.13 MB
   
   [Mahout + NumPy] Loading and encoding...
 Total Time (I/O + Encode): 0.0581 s
 Throughput: 172165.2 samples/sec
 Average per sample: 0.01 ms
   
   [PennyLane + NumPy] Loading and encoding...
 Total Time (I/O + Encode): 2.8481 s
 Throughput: 3511.2 samples/sec
 Average per sample: 0.28 ms
   
   ==
   SUMMARY
   ==
   Framework   Time (s) Throughput   Avg/Sample 
   --
   Mahout  0.0581   172165.2 0.01   
   PennyLane   2.8481   3511.2   0.28   
   
   --
   SPEEDUP COMPARISON
   --
   Mahout vs PennyLane: 49.03x
   Time reduction: 49.03x faster
   
   Cleaned up temporary file: /tmp/tmp0rf7j7p6.npy
   
   ==
   BENCHMARK COMPLETE
   ==
   ```
   
   After:
   ```
   ==
   NUMPY I/O + ENCODING BENCHMARK
   ==
   Qubits: 10
   Sample size: 1024 elements
   Number of samples: 1
   Total data: 78.12 MB
   Frameworks: mahout, pennylane
   
   Generating test data...
   Saving to /tmp/tmpq898jgel.npy...
   File size: 78.13 MB
   
   [Mahout + NumPy] Loading and encoding...
 Total Time (I/O + Encode): 0.0574 s
 Throughput: 174195.3 samples/sec
 Average per sample: 0.01 ms
   
   [PennyLane + NumPy] Loading and encoding...
 Total Time (I/O + Encode): 2.8587 s
 Throughput: 3498.1 samples/sec
 Average per sample: 0.29 ms
   
   ==
   SUMMARY
   ==
   Framework   Time (s) Throughput   Avg/Sample 
   --
   Mahout  0.0574   174195.3 0.01   
   PennyLane   2.8587   3498.1   0.29   
   
   --
   SPEEDUP COMPARISON
   --
   Mahout vs PennyLane: 49.80x
   Time reduction: 49.80x faster
   
   Cleaned up temporary file: /tmp/tmpq898jgel.npy
   
   ==
   BENCHMARK COMPLETE
   ==
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] [QDP] Numpy Input Speed & Memory Improvements [mahout]

2026-01-20 Thread via GitHub


viiccwen commented on code in PR #877:
URL: https://github.com/apache/mahout/pull/877#discussion_r2708631447


##
qdp/qdp-core/src/readers/numpy.rs:
##
@@ -155,9 +428,260 @@ impl DataReader for NumpyReader {
 }
 }
 
+/// Streaming reader for NumPy `.npy` files containing 2D float64 arrays.
+///
+/// Reads data in chunks without loading the entire file into memory.
+pub struct NumpyStreamingReader {
+file: File,
+header: NpyHeader,
+row_cursor: usize,
+column_buf: Vec,
+}
+
+impl NumpyStreamingReader {
+/// Create a new streaming NumPy reader.
+pub fn new>(path: P) -> Result {
+let path = path.as_ref();
+
+match path.try_exists() {
+Ok(false) => {
+return Err(MahoutError::Io(format!(
+"NumPy file not found: {}",
+path.display()
+)));
+}
+Err(e) => {
+return Err(MahoutError::Io(format!(
+"Failed to check if NumPy file exists at {}: {}",
+path.display(),
+e
+)));
+}
+Ok(true) => {}
+}
+
+let mut file = File::open(path)
+.map_err(|e| MahoutError::Io(format!("Failed to open NumPy file: 
{}", e)))?;
+let header = read_npy_header(path, &mut file)?;
+
+Ok(Self {
+file,
+header,
+row_cursor: 0,
+column_buf: Vec::new(),
+})
+}
+}
+
+impl DataReader for NumpyStreamingReader {
+fn read_batch(&mut self) -> Result<(Vec, usize, usize)> {
+let total_elements = self.header.total_elements();
+let mut data = vec![0.0; total_elements];
+let mut written = 0;
+while written < total_elements {
+let n = self.read_chunk(&mut data[written..])?;
+if n == 0 {
+break;
+}
+written += n;
+}
+if written != total_elements {
+data.truncate(written);
+}
+
+Ok((data, self.header.num_samples, self.header.sample_size))
+}
+
+fn get_sample_size(&self) -> Option {
+Some(self.header.sample_size)
+}
+
+fn get_num_samples(&self) -> Option {
+Some(self.header.num_samples)
+}
+}
+
+impl StreamingDataReader for NumpyStreamingReader {
+fn read_chunk(&mut self, buffer: &mut [f64]) -> Result {
+if self.row_cursor >= self.header.num_samples {
+return Ok(0);
+}
+
+let sample_size = self.header.sample_size;
+let max_rows = buffer.len() / sample_size;
+if max_rows == 0 {
+return Err(MahoutError::InvalidInput(format!(
+"Buffer too small for one sample (need {} elements)",
+sample_size
+)));
+}
+
+let remaining_rows = self.header.num_samples - self.row_cursor;
+let rows_to_read = std::cmp::min(max_rows, remaining_rows);
+let elem_count = rows_to_read * sample_size;
+
+if !self.header.fortran_order {
+let offset = self.header.data_offset
++ (self.row_cursor * sample_size * std::mem::size_of::()) 
as u64;
+read_f64s_at(&mut self.file, offset, &mut buffer[..elem_count])?;
+} else {
+if self.column_buf.len() < rows_to_read {
+self.column_buf.resize(rows_to_read, 0.0);
+}
+for col in 0..sample_size {
+let offset = self.header.data_offset
++ ((col * self.header.num_samples + self.row_cursor)
+* std::mem::size_of::()) as u64;
+let column = &mut self.column_buf[..rows_to_read];
+read_f64s_at(&mut self.file, offset, column)?;
+for row in 0..rows_to_read {
+buffer[row * sample_size + col] = column[row];
+}
+}
+}
+
+self.row_cursor += rows_to_read;
+Ok(elem_count)
+}
+
+fn total_rows(&self) -> usize {
+self.header.num_samples
+}
+}
+
+/// Memory-mapped reader for NumPy `.npy` files containing 2D float64 arrays.
+///
+/// Maps the file into memory and streams slices without an extra read + 
flatten pass.
+pub struct NumpyMmapReader {
+mmap: Mmap,
+header: NpyHeader,
+row_cursor: usize,
+column_buf: Vec,
+}
+
+impl NumpyMmapReader {
+/// Create a new memory-mapped NumPy reader.
+pub fn new>(path: P) -> Result {
+let path = path.as_ref();
+
+match path.try_exists() {
+Ok(false) => {
+return Err(MahoutError::Io(format!(
+"NumPy file not found: {}",
+path.display()
+)));
+}
+Err(e) => {
+return Err(MahoutError::Io(format!(
+"Failed to check if NumPy file exists at {}: {}",
+   

Re: [PR] [QDP] Numpy Input Speed & Memory Improvements [mahout]

2026-01-20 Thread via GitHub


400Ping commented on code in PR #877:
URL: https://github.com/apache/mahout/pull/877#discussion_r2708623801


##
qdp/qdp-core/src/readers/numpy.rs:
##
@@ -155,9 +428,260 @@ impl DataReader for NumpyReader {
 }
 }
 
+/// Streaming reader for NumPy `.npy` files containing 2D float64 arrays.
+///
+/// Reads data in chunks without loading the entire file into memory.
+pub struct NumpyStreamingReader {
+file: File,
+header: NpyHeader,
+row_cursor: usize,
+column_buf: Vec,
+}
+
+impl NumpyStreamingReader {
+/// Create a new streaming NumPy reader.
+pub fn new>(path: P) -> Result {
+let path = path.as_ref();
+
+match path.try_exists() {
+Ok(false) => {
+return Err(MahoutError::Io(format!(
+"NumPy file not found: {}",
+path.display()
+)));
+}
+Err(e) => {
+return Err(MahoutError::Io(format!(
+"Failed to check if NumPy file exists at {}: {}",
+path.display(),
+e
+)));
+}
+Ok(true) => {}
+}
+
+let mut file = File::open(path)
+.map_err(|e| MahoutError::Io(format!("Failed to open NumPy file: 
{}", e)))?;
+let header = read_npy_header(path, &mut file)?;
+
+Ok(Self {
+file,
+header,
+row_cursor: 0,
+column_buf: Vec::new(),
+})
+}
+}
+
+impl DataReader for NumpyStreamingReader {
+fn read_batch(&mut self) -> Result<(Vec, usize, usize)> {
+let total_elements = self.header.total_elements();
+let mut data = vec![0.0; total_elements];
+let mut written = 0;
+while written < total_elements {
+let n = self.read_chunk(&mut data[written..])?;
+if n == 0 {
+break;
+}
+written += n;
+}
+if written != total_elements {
+data.truncate(written);
+}
+
+Ok((data, self.header.num_samples, self.header.sample_size))
+}
+
+fn get_sample_size(&self) -> Option {
+Some(self.header.sample_size)
+}
+
+fn get_num_samples(&self) -> Option {
+Some(self.header.num_samples)
+}
+}
+
+impl StreamingDataReader for NumpyStreamingReader {
+fn read_chunk(&mut self, buffer: &mut [f64]) -> Result {
+if self.row_cursor >= self.header.num_samples {
+return Ok(0);
+}
+
+let sample_size = self.header.sample_size;
+let max_rows = buffer.len() / sample_size;
+if max_rows == 0 {
+return Err(MahoutError::InvalidInput(format!(
+"Buffer too small for one sample (need {} elements)",
+sample_size
+)));
+}
+
+let remaining_rows = self.header.num_samples - self.row_cursor;
+let rows_to_read = std::cmp::min(max_rows, remaining_rows);
+let elem_count = rows_to_read * sample_size;
+
+if !self.header.fortran_order {
+let offset = self.header.data_offset
++ (self.row_cursor * sample_size * std::mem::size_of::()) 
as u64;
+read_f64s_at(&mut self.file, offset, &mut buffer[..elem_count])?;
+} else {
+if self.column_buf.len() < rows_to_read {
+self.column_buf.resize(rows_to_read, 0.0);
+}
+for col in 0..sample_size {
+let offset = self.header.data_offset
++ ((col * self.header.num_samples + self.row_cursor)
+* std::mem::size_of::()) as u64;
+let column = &mut self.column_buf[..rows_to_read];
+read_f64s_at(&mut self.file, offset, column)?;
+for row in 0..rows_to_read {
+buffer[row * sample_size + col] = column[row];
+}
+}
+}
+
+self.row_cursor += rows_to_read;
+Ok(elem_count)
+}
+
+fn total_rows(&self) -> usize {
+self.header.num_samples
+}
+}
+
+/// Memory-mapped reader for NumPy `.npy` files containing 2D float64 arrays.
+///
+/// Maps the file into memory and streams slices without an extra read + 
flatten pass.
+pub struct NumpyMmapReader {
+mmap: Mmap,
+header: NpyHeader,
+row_cursor: usize,
+column_buf: Vec,
+}
+
+impl NumpyMmapReader {
+/// Create a new memory-mapped NumPy reader.
+pub fn new>(path: P) -> Result {
+let path = path.as_ref();
+
+match path.try_exists() {
+Ok(false) => {
+return Err(MahoutError::Io(format!(
+"NumPy file not found: {}",
+path.display()
+)));
+}
+Err(e) => {
+return Err(MahoutError::Io(format!(
+"Failed to check if NumPy file exists at {}: {}",
+

Re: [PR] [QDP] Numpy Input Speed & Memory Improvements [mahout]

2026-01-20 Thread via GitHub


400Ping commented on code in PR #877:
URL: https://github.com/apache/mahout/pull/877#discussion_r2708620303


##
qdp/qdp-core/src/readers/numpy.rs:
##
@@ -155,9 +428,260 @@ impl DataReader for NumpyReader {
 }
 }
 
+/// Streaming reader for NumPy `.npy` files containing 2D float64 arrays.
+///
+/// Reads data in chunks without loading the entire file into memory.
+pub struct NumpyStreamingReader {
+file: File,
+header: NpyHeader,
+row_cursor: usize,
+column_buf: Vec,
+}
+
+impl NumpyStreamingReader {
+/// Create a new streaming NumPy reader.
+pub fn new>(path: P) -> Result {
+let path = path.as_ref();
+
+match path.try_exists() {
+Ok(false) => {
+return Err(MahoutError::Io(format!(
+"NumPy file not found: {}",
+path.display()
+)));
+}
+Err(e) => {
+return Err(MahoutError::Io(format!(
+"Failed to check if NumPy file exists at {}: {}",
+path.display(),
+e
+)));
+}
+Ok(true) => {}
+}
+
+let mut file = File::open(path)
+.map_err(|e| MahoutError::Io(format!("Failed to open NumPy file: 
{}", e)))?;
+let header = read_npy_header(path, &mut file)?;
+
+Ok(Self {
+file,
+header,
+row_cursor: 0,
+column_buf: Vec::new(),
+})
+}
+}
+
+impl DataReader for NumpyStreamingReader {
+fn read_batch(&mut self) -> Result<(Vec, usize, usize)> {
+let total_elements = self.header.total_elements();
+let mut data = vec![0.0; total_elements];
+let mut written = 0;
+while written < total_elements {
+let n = self.read_chunk(&mut data[written..])?;
+if n == 0 {
+break;
+}
+written += n;
+}
+if written != total_elements {
+data.truncate(written);
+}
+
+Ok((data, self.header.num_samples, self.header.sample_size))
+}
+
+fn get_sample_size(&self) -> Option {
+Some(self.header.sample_size)
+}
+
+fn get_num_samples(&self) -> Option {
+Some(self.header.num_samples)
+}
+}
+
+impl StreamingDataReader for NumpyStreamingReader {
+fn read_chunk(&mut self, buffer: &mut [f64]) -> Result {
+if self.row_cursor >= self.header.num_samples {
+return Ok(0);
+}
+
+let sample_size = self.header.sample_size;
+let max_rows = buffer.len() / sample_size;
+if max_rows == 0 {
+return Err(MahoutError::InvalidInput(format!(
+"Buffer too small for one sample (need {} elements)",
+sample_size
+)));
+}
+
+let remaining_rows = self.header.num_samples - self.row_cursor;
+let rows_to_read = std::cmp::min(max_rows, remaining_rows);
+let elem_count = rows_to_read * sample_size;
+
+if !self.header.fortran_order {
+let offset = self.header.data_offset
++ (self.row_cursor * sample_size * std::mem::size_of::()) 
as u64;
+read_f64s_at(&mut self.file, offset, &mut buffer[..elem_count])?;
+} else {
+if self.column_buf.len() < rows_to_read {
+self.column_buf.resize(rows_to_read, 0.0);
+}
+for col in 0..sample_size {
+let offset = self.header.data_offset
++ ((col * self.header.num_samples + self.row_cursor)
+* std::mem::size_of::()) as u64;
+let column = &mut self.column_buf[..rows_to_read];
+read_f64s_at(&mut self.file, offset, column)?;
+for row in 0..rows_to_read {
+buffer[row * sample_size + col] = column[row];
+}
+}
+}
+
+self.row_cursor += rows_to_read;
+Ok(elem_count)
+}
+
+fn total_rows(&self) -> usize {
+self.header.num_samples
+}
+}
+
+/// Memory-mapped reader for NumPy `.npy` files containing 2D float64 arrays.
+///
+/// Maps the file into memory and streams slices without an extra read + 
flatten pass.
+pub struct NumpyMmapReader {
+mmap: Mmap,
+header: NpyHeader,
+row_cursor: usize,
+column_buf: Vec,
+}
+
+impl NumpyMmapReader {
+/// Create a new memory-mapped NumPy reader.
+pub fn new>(path: P) -> Result {
+let path = path.as_ref();
+
+match path.try_exists() {
+Ok(false) => {
+return Err(MahoutError::Io(format!(
+"NumPy file not found: {}",
+path.display()
+)));
+}
+Err(e) => {
+return Err(MahoutError::Io(format!(
+"Failed to check if NumPy file exists at {}: {}",
+

Re: [PR] [QDP] Numpy Input Speed & Memory Improvements [mahout]

2026-01-20 Thread via GitHub


400Ping commented on code in PR #877:
URL: https://github.com/apache/mahout/pull/877#discussion_r2708620303


##
qdp/qdp-core/src/readers/numpy.rs:
##
@@ -155,9 +428,260 @@ impl DataReader for NumpyReader {
 }
 }
 
+/// Streaming reader for NumPy `.npy` files containing 2D float64 arrays.
+///
+/// Reads data in chunks without loading the entire file into memory.
+pub struct NumpyStreamingReader {
+file: File,
+header: NpyHeader,
+row_cursor: usize,
+column_buf: Vec,
+}
+
+impl NumpyStreamingReader {
+/// Create a new streaming NumPy reader.
+pub fn new>(path: P) -> Result {
+let path = path.as_ref();
+
+match path.try_exists() {
+Ok(false) => {
+return Err(MahoutError::Io(format!(
+"NumPy file not found: {}",
+path.display()
+)));
+}
+Err(e) => {
+return Err(MahoutError::Io(format!(
+"Failed to check if NumPy file exists at {}: {}",
+path.display(),
+e
+)));
+}
+Ok(true) => {}
+}
+
+let mut file = File::open(path)
+.map_err(|e| MahoutError::Io(format!("Failed to open NumPy file: 
{}", e)))?;
+let header = read_npy_header(path, &mut file)?;
+
+Ok(Self {
+file,
+header,
+row_cursor: 0,
+column_buf: Vec::new(),
+})
+}
+}
+
+impl DataReader for NumpyStreamingReader {
+fn read_batch(&mut self) -> Result<(Vec, usize, usize)> {
+let total_elements = self.header.total_elements();
+let mut data = vec![0.0; total_elements];
+let mut written = 0;
+while written < total_elements {
+let n = self.read_chunk(&mut data[written..])?;
+if n == 0 {
+break;
+}
+written += n;
+}
+if written != total_elements {
+data.truncate(written);
+}
+
+Ok((data, self.header.num_samples, self.header.sample_size))
+}
+
+fn get_sample_size(&self) -> Option {
+Some(self.header.sample_size)
+}
+
+fn get_num_samples(&self) -> Option {
+Some(self.header.num_samples)
+}
+}
+
+impl StreamingDataReader for NumpyStreamingReader {
+fn read_chunk(&mut self, buffer: &mut [f64]) -> Result {
+if self.row_cursor >= self.header.num_samples {
+return Ok(0);
+}
+
+let sample_size = self.header.sample_size;
+let max_rows = buffer.len() / sample_size;
+if max_rows == 0 {
+return Err(MahoutError::InvalidInput(format!(
+"Buffer too small for one sample (need {} elements)",
+sample_size
+)));
+}
+
+let remaining_rows = self.header.num_samples - self.row_cursor;
+let rows_to_read = std::cmp::min(max_rows, remaining_rows);
+let elem_count = rows_to_read * sample_size;
+
+if !self.header.fortran_order {
+let offset = self.header.data_offset
++ (self.row_cursor * sample_size * std::mem::size_of::()) 
as u64;
+read_f64s_at(&mut self.file, offset, &mut buffer[..elem_count])?;
+} else {
+if self.column_buf.len() < rows_to_read {
+self.column_buf.resize(rows_to_read, 0.0);
+}
+for col in 0..sample_size {
+let offset = self.header.data_offset
++ ((col * self.header.num_samples + self.row_cursor)
+* std::mem::size_of::()) as u64;
+let column = &mut self.column_buf[..rows_to_read];
+read_f64s_at(&mut self.file, offset, column)?;
+for row in 0..rows_to_read {
+buffer[row * sample_size + col] = column[row];
+}
+}
+}
+
+self.row_cursor += rows_to_read;
+Ok(elem_count)
+}
+
+fn total_rows(&self) -> usize {
+self.header.num_samples
+}
+}
+
+/// Memory-mapped reader for NumPy `.npy` files containing 2D float64 arrays.
+///
+/// Maps the file into memory and streams slices without an extra read + 
flatten pass.
+pub struct NumpyMmapReader {
+mmap: Mmap,
+header: NpyHeader,
+row_cursor: usize,
+column_buf: Vec,
+}
+
+impl NumpyMmapReader {
+/// Create a new memory-mapped NumPy reader.
+pub fn new>(path: P) -> Result {
+let path = path.as_ref();
+
+match path.try_exists() {
+Ok(false) => {
+return Err(MahoutError::Io(format!(
+"NumPy file not found: {}",
+path.display()
+)));
+}
+Err(e) => {
+return Err(MahoutError::Io(format!(
+"Failed to check if NumPy file exists at {}: {}",
+

Re: [PR] [QDP] Numpy Input Speed & Memory Improvements [mahout]

2026-01-20 Thread via GitHub


viiccwen commented on code in PR #877:
URL: https://github.com/apache/mahout/pull/877#discussion_r2708586279


##
qdp/qdp-core/src/readers/numpy.rs:
##
@@ -155,9 +428,260 @@ impl DataReader for NumpyReader {
 }
 }
 
+/// Streaming reader for NumPy `.npy` files containing 2D float64 arrays.
+///
+/// Reads data in chunks without loading the entire file into memory.
+pub struct NumpyStreamingReader {
+file: File,
+header: NpyHeader,
+row_cursor: usize,
+column_buf: Vec,
+}
+
+impl NumpyStreamingReader {
+/// Create a new streaming NumPy reader.
+pub fn new>(path: P) -> Result {
+let path = path.as_ref();
+
+match path.try_exists() {
+Ok(false) => {
+return Err(MahoutError::Io(format!(
+"NumPy file not found: {}",
+path.display()
+)));
+}
+Err(e) => {
+return Err(MahoutError::Io(format!(
+"Failed to check if NumPy file exists at {}: {}",
+path.display(),
+e
+)));
+}
+Ok(true) => {}
+}
+
+let mut file = File::open(path)
+.map_err(|e| MahoutError::Io(format!("Failed to open NumPy file: 
{}", e)))?;
+let header = read_npy_header(path, &mut file)?;
+
+Ok(Self {
+file,
+header,
+row_cursor: 0,
+column_buf: Vec::new(),
+})
+}
+}
+
+impl DataReader for NumpyStreamingReader {
+fn read_batch(&mut self) -> Result<(Vec, usize, usize)> {
+let total_elements = self.header.total_elements();
+let mut data = vec![0.0; total_elements];
+let mut written = 0;
+while written < total_elements {
+let n = self.read_chunk(&mut data[written..])?;
+if n == 0 {
+break;
+}
+written += n;
+}
+if written != total_elements {
+data.truncate(written);
+}
+
+Ok((data, self.header.num_samples, self.header.sample_size))
+}
+
+fn get_sample_size(&self) -> Option {
+Some(self.header.sample_size)
+}
+
+fn get_num_samples(&self) -> Option {
+Some(self.header.num_samples)
+}
+}
+
+impl StreamingDataReader for NumpyStreamingReader {
+fn read_chunk(&mut self, buffer: &mut [f64]) -> Result {
+if self.row_cursor >= self.header.num_samples {
+return Ok(0);
+}
+
+let sample_size = self.header.sample_size;
+let max_rows = buffer.len() / sample_size;
+if max_rows == 0 {
+return Err(MahoutError::InvalidInput(format!(
+"Buffer too small for one sample (need {} elements)",
+sample_size
+)));
+}
+
+let remaining_rows = self.header.num_samples - self.row_cursor;
+let rows_to_read = std::cmp::min(max_rows, remaining_rows);
+let elem_count = rows_to_read * sample_size;
+
+if !self.header.fortran_order {
+let offset = self.header.data_offset
++ (self.row_cursor * sample_size * std::mem::size_of::()) 
as u64;
+read_f64s_at(&mut self.file, offset, &mut buffer[..elem_count])?;
+} else {
+if self.column_buf.len() < rows_to_read {
+self.column_buf.resize(rows_to_read, 0.0);
+}
+for col in 0..sample_size {
+let offset = self.header.data_offset
++ ((col * self.header.num_samples + self.row_cursor)
+* std::mem::size_of::()) as u64;
+let column = &mut self.column_buf[..rows_to_read];
+read_f64s_at(&mut self.file, offset, column)?;
+for row in 0..rows_to_read {
+buffer[row * sample_size + col] = column[row];
+}
+}
+}
+
+self.row_cursor += rows_to_read;
+Ok(elem_count)
+}
+
+fn total_rows(&self) -> usize {
+self.header.num_samples
+}
+}
+
+/// Memory-mapped reader for NumPy `.npy` files containing 2D float64 arrays.
+///
+/// Maps the file into memory and streams slices without an extra read + 
flatten pass.
+pub struct NumpyMmapReader {
+mmap: Mmap,
+header: NpyHeader,
+row_cursor: usize,
+column_buf: Vec,
+}
+
+impl NumpyMmapReader {
+/// Create a new memory-mapped NumPy reader.
+pub fn new>(path: P) -> Result {
+let path = path.as_ref();
+
+match path.try_exists() {
+Ok(false) => {
+return Err(MahoutError::Io(format!(
+"NumPy file not found: {}",
+path.display()
+)));
+}
+Err(e) => {
+return Err(MahoutError::Io(format!(
+"Failed to check if NumPy file exists at {}: {}",
+   

Re: [PR] [QDP] Numpy Input Speed & Memory Improvements [mahout]

2026-01-20 Thread via GitHub


ryankert01 commented on PR #877:
URL: https://github.com/apache/mahout/pull/877#issuecomment-3772428139

Could you provide a benchmark before and after result if any? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] [QDP] Numpy Input Speed & Memory Improvements [mahout]

2026-01-20 Thread via GitHub


400Ping commented on code in PR #877:
URL: https://github.com/apache/mahout/pull/877#discussion_r2707892631


##
qdp/qdp-core/src/readers/numpy.rs:
##
@@ -155,9 +428,260 @@ impl DataReader for NumpyReader {
 }
 }
 
+/// Streaming reader for NumPy `.npy` files containing 2D float64 arrays.
+///
+/// Reads data in chunks without loading the entire file into memory.
+pub struct NumpyStreamingReader {
+file: File,
+header: NpyHeader,
+row_cursor: usize,
+column_buf: Vec,
+}
+
+impl NumpyStreamingReader {
+/// Create a new streaming NumPy reader.
+pub fn new>(path: P) -> Result {
+let path = path.as_ref();
+
+match path.try_exists() {
+Ok(false) => {
+return Err(MahoutError::Io(format!(
+"NumPy file not found: {}",
+path.display()
+)));
+}
+Err(e) => {
+return Err(MahoutError::Io(format!(
+"Failed to check if NumPy file exists at {}: {}",
+path.display(),
+e
+)));
+}
+Ok(true) => {}
+}
+
+let mut file = File::open(path)
+.map_err(|e| MahoutError::Io(format!("Failed to open NumPy file: 
{}", e)))?;
+let header = read_npy_header(path, &mut file)?;
+
+Ok(Self {
+file,
+header,
+row_cursor: 0,
+column_buf: Vec::new(),
+})
+}
+}
+
+impl DataReader for NumpyStreamingReader {
+fn read_batch(&mut self) -> Result<(Vec, usize, usize)> {
+let total_elements = self.header.total_elements();
+let mut data = vec![0.0; total_elements];
+let mut written = 0;
+while written < total_elements {
+let n = self.read_chunk(&mut data[written..])?;
+if n == 0 {
+break;
+}
+written += n;
+}
+if written != total_elements {
+data.truncate(written);
+}
+
+Ok((data, self.header.num_samples, self.header.sample_size))
+}
+
+fn get_sample_size(&self) -> Option {
+Some(self.header.sample_size)
+}
+
+fn get_num_samples(&self) -> Option {
+Some(self.header.num_samples)
+}
+}
+
+impl StreamingDataReader for NumpyStreamingReader {
+fn read_chunk(&mut self, buffer: &mut [f64]) -> Result {
+if self.row_cursor >= self.header.num_samples {
+return Ok(0);
+}
+
+let sample_size = self.header.sample_size;
+let max_rows = buffer.len() / sample_size;
+if max_rows == 0 {
+return Err(MahoutError::InvalidInput(format!(
+"Buffer too small for one sample (need {} elements)",
+sample_size
+)));
+}
+
+let remaining_rows = self.header.num_samples - self.row_cursor;
+let rows_to_read = std::cmp::min(max_rows, remaining_rows);
+let elem_count = rows_to_read * sample_size;
+
+if !self.header.fortran_order {
+let offset = self.header.data_offset
++ (self.row_cursor * sample_size * std::mem::size_of::()) 
as u64;
+read_f64s_at(&mut self.file, offset, &mut buffer[..elem_count])?;
+} else {
+if self.column_buf.len() < rows_to_read {
+self.column_buf.resize(rows_to_read, 0.0);
+}
+for col in 0..sample_size {
+let offset = self.header.data_offset
++ ((col * self.header.num_samples + self.row_cursor)
+* std::mem::size_of::()) as u64;
+let column = &mut self.column_buf[..rows_to_read];
+read_f64s_at(&mut self.file, offset, column)?;
+for row in 0..rows_to_read {
+buffer[row * sample_size + col] = column[row];
+}
+}
+}
+
+self.row_cursor += rows_to_read;
+Ok(elem_count)
+}
+
+fn total_rows(&self) -> usize {
+self.header.num_samples
+}
+}
+
+/// Memory-mapped reader for NumPy `.npy` files containing 2D float64 arrays.
+///
+/// Maps the file into memory and streams slices without an extra read + 
flatten pass.
+pub struct NumpyMmapReader {
+mmap: Mmap,
+header: NpyHeader,
+row_cursor: usize,
+column_buf: Vec,
+}
+
+impl NumpyMmapReader {
+/// Create a new memory-mapped NumPy reader.
+pub fn new>(path: P) -> Result {
+let path = path.as_ref();
+
+match path.try_exists() {
+Ok(false) => {
+return Err(MahoutError::Io(format!(
+"NumPy file not found: {}",
+path.display()
+)));
+}
+Err(e) => {
+return Err(MahoutError::Io(format!(
+"Failed to check if NumPy file exists at {}: {}",
+

Re: [PR] [QDP] Numpy Input Speed & Memory Improvements [mahout]

2026-01-20 Thread via GitHub


viiccwen commented on code in PR #877:
URL: https://github.com/apache/mahout/pull/877#discussion_r2707425519


##
qdp/qdp-core/src/readers/numpy.rs:
##
@@ -155,9 +428,260 @@ impl DataReader for NumpyReader {
 }
 }
 
+/// Streaming reader for NumPy `.npy` files containing 2D float64 arrays.
+///
+/// Reads data in chunks without loading the entire file into memory.
+pub struct NumpyStreamingReader {
+file: File,
+header: NpyHeader,
+row_cursor: usize,
+column_buf: Vec,
+}
+
+impl NumpyStreamingReader {
+/// Create a new streaming NumPy reader.
+pub fn new>(path: P) -> Result {
+let path = path.as_ref();
+
+match path.try_exists() {
+Ok(false) => {
+return Err(MahoutError::Io(format!(
+"NumPy file not found: {}",
+path.display()
+)));
+}
+Err(e) => {
+return Err(MahoutError::Io(format!(
+"Failed to check if NumPy file exists at {}: {}",
+path.display(),
+e
+)));
+}
+Ok(true) => {}
+}
+
+let mut file = File::open(path)
+.map_err(|e| MahoutError::Io(format!("Failed to open NumPy file: 
{}", e)))?;
+let header = read_npy_header(path, &mut file)?;
+
+Ok(Self {
+file,
+header,
+row_cursor: 0,
+column_buf: Vec::new(),
+})
+}
+}
+
+impl DataReader for NumpyStreamingReader {
+fn read_batch(&mut self) -> Result<(Vec, usize, usize)> {
+let total_elements = self.header.total_elements();
+let mut data = vec![0.0; total_elements];
+let mut written = 0;
+while written < total_elements {
+let n = self.read_chunk(&mut data[written..])?;
+if n == 0 {
+break;
+}
+written += n;
+}
+if written != total_elements {
+data.truncate(written);
+}
+
+Ok((data, self.header.num_samples, self.header.sample_size))
+}
+
+fn get_sample_size(&self) -> Option {
+Some(self.header.sample_size)
+}
+
+fn get_num_samples(&self) -> Option {
+Some(self.header.num_samples)
+}
+}
+
+impl StreamingDataReader for NumpyStreamingReader {
+fn read_chunk(&mut self, buffer: &mut [f64]) -> Result {
+if self.row_cursor >= self.header.num_samples {
+return Ok(0);
+}
+
+let sample_size = self.header.sample_size;
+let max_rows = buffer.len() / sample_size;
+if max_rows == 0 {
+return Err(MahoutError::InvalidInput(format!(
+"Buffer too small for one sample (need {} elements)",
+sample_size
+)));
+}
+
+let remaining_rows = self.header.num_samples - self.row_cursor;
+let rows_to_read = std::cmp::min(max_rows, remaining_rows);
+let elem_count = rows_to_read * sample_size;
+
+if !self.header.fortran_order {
+let offset = self.header.data_offset
++ (self.row_cursor * sample_size * std::mem::size_of::()) 
as u64;
+read_f64s_at(&mut self.file, offset, &mut buffer[..elem_count])?;
+} else {
+if self.column_buf.len() < rows_to_read {
+self.column_buf.resize(rows_to_read, 0.0);
+}
+for col in 0..sample_size {
+let offset = self.header.data_offset
++ ((col * self.header.num_samples + self.row_cursor)
+* std::mem::size_of::()) as u64;
+let column = &mut self.column_buf[..rows_to_read];
+read_f64s_at(&mut self.file, offset, column)?;
+for row in 0..rows_to_read {
+buffer[row * sample_size + col] = column[row];
+}
+}
+}
+
+self.row_cursor += rows_to_read;
+Ok(elem_count)
+}
+
+fn total_rows(&self) -> usize {
+self.header.num_samples
+}
+}
+
+/// Memory-mapped reader for NumPy `.npy` files containing 2D float64 arrays.
+///
+/// Maps the file into memory and streams slices without an extra read + 
flatten pass.
+pub struct NumpyMmapReader {
+mmap: Mmap,
+header: NpyHeader,
+row_cursor: usize,
+column_buf: Vec,
+}
+
+impl NumpyMmapReader {
+/// Create a new memory-mapped NumPy reader.
+pub fn new>(path: P) -> Result {
+let path = path.as_ref();
+
+match path.try_exists() {
+Ok(false) => {
+return Err(MahoutError::Io(format!(
+"NumPy file not found: {}",
+path.display()
+)));
+}
+Err(e) => {
+return Err(MahoutError::Io(format!(
+"Failed to check if NumPy file exists at {}: {}",
+   

Re: [PR] [QDP] Numpy Input Speed & Memory Improvements [mahout]

2026-01-20 Thread via GitHub


viiccwen commented on code in PR #877:
URL: https://github.com/apache/mahout/pull/877#discussion_r2707425519


##
qdp/qdp-core/src/readers/numpy.rs:
##
@@ -155,9 +428,260 @@ impl DataReader for NumpyReader {
 }
 }
 
+/// Streaming reader for NumPy `.npy` files containing 2D float64 arrays.
+///
+/// Reads data in chunks without loading the entire file into memory.
+pub struct NumpyStreamingReader {
+file: File,
+header: NpyHeader,
+row_cursor: usize,
+column_buf: Vec,
+}
+
+impl NumpyStreamingReader {
+/// Create a new streaming NumPy reader.
+pub fn new>(path: P) -> Result {
+let path = path.as_ref();
+
+match path.try_exists() {
+Ok(false) => {
+return Err(MahoutError::Io(format!(
+"NumPy file not found: {}",
+path.display()
+)));
+}
+Err(e) => {
+return Err(MahoutError::Io(format!(
+"Failed to check if NumPy file exists at {}: {}",
+path.display(),
+e
+)));
+}
+Ok(true) => {}
+}
+
+let mut file = File::open(path)
+.map_err(|e| MahoutError::Io(format!("Failed to open NumPy file: 
{}", e)))?;
+let header = read_npy_header(path, &mut file)?;
+
+Ok(Self {
+file,
+header,
+row_cursor: 0,
+column_buf: Vec::new(),
+})
+}
+}
+
+impl DataReader for NumpyStreamingReader {
+fn read_batch(&mut self) -> Result<(Vec, usize, usize)> {
+let total_elements = self.header.total_elements();
+let mut data = vec![0.0; total_elements];
+let mut written = 0;
+while written < total_elements {
+let n = self.read_chunk(&mut data[written..])?;
+if n == 0 {
+break;
+}
+written += n;
+}
+if written != total_elements {
+data.truncate(written);
+}
+
+Ok((data, self.header.num_samples, self.header.sample_size))
+}
+
+fn get_sample_size(&self) -> Option {
+Some(self.header.sample_size)
+}
+
+fn get_num_samples(&self) -> Option {
+Some(self.header.num_samples)
+}
+}
+
+impl StreamingDataReader for NumpyStreamingReader {
+fn read_chunk(&mut self, buffer: &mut [f64]) -> Result {
+if self.row_cursor >= self.header.num_samples {
+return Ok(0);
+}
+
+let sample_size = self.header.sample_size;
+let max_rows = buffer.len() / sample_size;
+if max_rows == 0 {
+return Err(MahoutError::InvalidInput(format!(
+"Buffer too small for one sample (need {} elements)",
+sample_size
+)));
+}
+
+let remaining_rows = self.header.num_samples - self.row_cursor;
+let rows_to_read = std::cmp::min(max_rows, remaining_rows);
+let elem_count = rows_to_read * sample_size;
+
+if !self.header.fortran_order {
+let offset = self.header.data_offset
++ (self.row_cursor * sample_size * std::mem::size_of::()) 
as u64;
+read_f64s_at(&mut self.file, offset, &mut buffer[..elem_count])?;
+} else {
+if self.column_buf.len() < rows_to_read {
+self.column_buf.resize(rows_to_read, 0.0);
+}
+for col in 0..sample_size {
+let offset = self.header.data_offset
++ ((col * self.header.num_samples + self.row_cursor)
+* std::mem::size_of::()) as u64;
+let column = &mut self.column_buf[..rows_to_read];
+read_f64s_at(&mut self.file, offset, column)?;
+for row in 0..rows_to_read {
+buffer[row * sample_size + col] = column[row];
+}
+}
+}
+
+self.row_cursor += rows_to_read;
+Ok(elem_count)
+}
+
+fn total_rows(&self) -> usize {
+self.header.num_samples
+}
+}
+
+/// Memory-mapped reader for NumPy `.npy` files containing 2D float64 arrays.
+///
+/// Maps the file into memory and streams slices without an extra read + 
flatten pass.
+pub struct NumpyMmapReader {
+mmap: Mmap,
+header: NpyHeader,
+row_cursor: usize,
+column_buf: Vec,
+}
+
+impl NumpyMmapReader {
+/// Create a new memory-mapped NumPy reader.
+pub fn new>(path: P) -> Result {
+let path = path.as_ref();
+
+match path.try_exists() {
+Ok(false) => {
+return Err(MahoutError::Io(format!(
+"NumPy file not found: {}",
+path.display()
+)));
+}
+Err(e) => {
+return Err(MahoutError::Io(format!(
+"Failed to check if NumPy file exists at {}: {}",
+   

Re: [PR] [QDP] Numpy Input Speed & Memory Improvements [mahout]

2026-01-20 Thread via GitHub


Copilot commented on code in PR #877:
URL: https://github.com/apache/mahout/pull/877#discussion_r2707315470


##
qdp/qdp-core/src/readers/numpy.rs:
##
@@ -155,9 +428,260 @@ impl DataReader for NumpyReader {
 }
 }
 
+/// Streaming reader for NumPy `.npy` files containing 2D float64 arrays.
+///
+/// Reads data in chunks without loading the entire file into memory.
+pub struct NumpyStreamingReader {
+file: File,
+header: NpyHeader,
+row_cursor: usize,
+column_buf: Vec,
+}
+
+impl NumpyStreamingReader {
+/// Create a new streaming NumPy reader.
+pub fn new>(path: P) -> Result {
+let path = path.as_ref();
+
+match path.try_exists() {
+Ok(false) => {
+return Err(MahoutError::Io(format!(
+"NumPy file not found: {}",
+path.display()
+)));
+}
+Err(e) => {
+return Err(MahoutError::Io(format!(
+"Failed to check if NumPy file exists at {}: {}",
+path.display(),
+e
+)));
+}
+Ok(true) => {}
+}
+
+let mut file = File::open(path)
+.map_err(|e| MahoutError::Io(format!("Failed to open NumPy file: 
{}", e)))?;
+let header = read_npy_header(path, &mut file)?;
+
+Ok(Self {
+file,
+header,
+row_cursor: 0,
+column_buf: Vec::new(),
+})
+}
+}
+
+impl DataReader for NumpyStreamingReader {
+fn read_batch(&mut self) -> Result<(Vec, usize, usize)> {
+let total_elements = self.header.total_elements();
+let mut data = vec![0.0; total_elements];
+let mut written = 0;
+while written < total_elements {
+let n = self.read_chunk(&mut data[written..])?;
+if n == 0 {
+break;
+}
+written += n;
+}
+if written != total_elements {
+data.truncate(written);
+}
+
+Ok((data, self.header.num_samples, self.header.sample_size))
+}
+
+fn get_sample_size(&self) -> Option {
+Some(self.header.sample_size)
+}
+
+fn get_num_samples(&self) -> Option {
+Some(self.header.num_samples)
+}
+}
+
+impl StreamingDataReader for NumpyStreamingReader {
+fn read_chunk(&mut self, buffer: &mut [f64]) -> Result {
+if self.row_cursor >= self.header.num_samples {
+return Ok(0);
+}
+
+let sample_size = self.header.sample_size;
+let max_rows = buffer.len() / sample_size;
+if max_rows == 0 {
+return Err(MahoutError::InvalidInput(format!(
+"Buffer too small for one sample (need {} elements)",
+sample_size
+)));
+}
+
+let remaining_rows = self.header.num_samples - self.row_cursor;
+let rows_to_read = std::cmp::min(max_rows, remaining_rows);
+let elem_count = rows_to_read * sample_size;
+
+if !self.header.fortran_order {
+let offset = self.header.data_offset
++ (self.row_cursor * sample_size * std::mem::size_of::()) 
as u64;
+read_f64s_at(&mut self.file, offset, &mut buffer[..elem_count])?;
+} else {
+if self.column_buf.len() < rows_to_read {
+self.column_buf.resize(rows_to_read, 0.0);
+}
+for col in 0..sample_size {
+let offset = self.header.data_offset
++ ((col * self.header.num_samples + self.row_cursor)
+* std::mem::size_of::()) as u64;
+let column = &mut self.column_buf[..rows_to_read];
+read_f64s_at(&mut self.file, offset, column)?;
+for row in 0..rows_to_read {
+buffer[row * sample_size + col] = column[row];
+}
+}

Review Comment:
   The Fortran-order reading path performs one seek and read per column (line 
536), resulting in sample_size seek operations. For wide matrices with many 
columns, this could cause significant performance degradation compared to 
C-order reading. Consider documenting this performance characteristic in the 
struct's documentation, or implementing a batched column read strategy to 
reduce the number of seeks.



##
qdp/qdp-core/src/readers/numpy.rs:
##
@@ -18,13 +18,286 @@
 //!
 //! Provides support for reading .npy files containing 2D float64 arrays.
 
-use std::path::Path;
+use std::fs::File;
+use std::io::{Read, Seek, SeekFrom};
+use std::path::{Path, PathBuf};
 
+use memmap2::Mmap;
 use ndarray::Array2;
 use ndarray_npy::ReadNpyError;
 
 use crate::error::{MahoutError, Result};
-use crate::reader::DataReader;
+use crate::reader::{DataReader, StreamingDataReader};
+
+const NPY_MAGIC: &[u8; 6] = b"\x93NUMPY";
+
+#[derive(Clone, Debug)]
+struct NpyHeader {
+fortran_order