threatpointer opened a new pull request, #16:
URL: https://github.com/apache/iotdb-mcp-server/pull/16

   # Fix filename validation in export functions
   
   ## Overview
   
   This PR improves the filename handling in the export functions 
(`export_query` and `export_table_query`) to ensure robust validation and 
prevent potential issues with malformed filenames.
   
   ## Changes
   
   ### Core Implementation
   - Added `sanitize_filename()` function with comprehensive validation:
     - Regex-based character validation (allows alphanumeric, underscore, 
hyphen, dot)
     - Path separator rejection
     - Path traversal prevention
     - Basename extraction for safety
     - Edge case handling (empty strings, dots)
     - Boundary enforcement (ensures files stay within export directory)
     - Maximum filename length validation (255 characters)
   
   ### Modified Functions
   - `export_query()`: Applied filename sanitization before writing CSV/Excel 
files
   - `export_table_query()`: Applied filename sanitization before writing 
CSV/Excel files
   
   ## Validation & Testing
   
   Comprehensive test suite with **32 test cases** covering all validation 
aspects:
   
   ### Test Coverage
   
   | Category | Tests | Status |
   |----------|-------|--------|
   | Path Traversal Attacks | 4 | ✓ All Pass |
   | Valid Filenames | 6 | ✓ All Pass |
   | Invalid Characters | 11 | ✓ All Pass |
   | Edge Cases | 4 | ✓ All Pass |
   | Boundary Validation | 2 | ✓ All Pass |
   | Exploit Scenarios | 5 | ✓ All Pass |
   | **TOTAL** | **32** | **✓ 100% Pass** |
   
   ### Test Results
   
   ```
   ======================================================================
   Apache IoTDB MCP Server: Fix filename validation in export functions 
Validation
   Author: Mohammed Tanveer (threatpointer)
   ======================================================================
   
   Running Path Traversal Attack Tests...
   [PASS] Simple path traversal blocked
   [PASS] Multi-level path traversal blocked
   [PASS] Path traversal with valid filename blocked
   [PASS] Absolute path attack blocked
   
   Running Valid Filename Tests...
   [PASS] Valid filename accepted: export.csv
   [PASS] Valid filename accepted: data_2024.xlsx
   [PASS] Valid filename accepted: my-file.csv
   [PASS] Valid filename accepted: test_file_123.csv
   [PASS] Valid filename accepted: UPPERCASE.CSV
   [PASS] Valid filename accepted: mixed_Case-File.123.csv
   
   Running Invalid Character Tests...
   [PASS] Invalid character rejected: file name.csv
   [PASS] Invalid character rejected: file;name.csv
   [PASS] Invalid character rejected: file:name.csv
   [PASS] Invalid character rejected: file*name.csv
   [PASS] Invalid character rejected: file?name.csv
   [PASS] Invalid character rejected: file|name.csv
   [PASS] Invalid character rejected: file<name.csv
   [PASS] Invalid character rejected: file>name.csv
   [PASS] Invalid character rejected: file"name.csv
   [PASS] Invalid character rejected: file\name.csv
   [PASS] Invalid character rejected: file/name.csv
   
   Running Edge Case Tests...
   [PASS] Empty filename rejected
   [PASS] Just dots rejected
   [PASS] Single dot rejected
   [PASS] Hidden file accepted
   
   Running Boundary Validation Tests...
   [PASS] Escape to parent directory blocked
   [PASS] Normalized path within boundary
   
   Running Exploit Scenario Tests...
   [PASS] Exploit scenario blocked: ../../../tmp/test_traversal.csv
   [PASS] Exploit scenario blocked: ../../../../Windows/Startup/backdoor.csv
   [PASS] Exploit scenario blocked: ../../../etc/cron.d/backdoor.csv
   [PASS] Exploit scenario blocked: 
../../../../../../home/user/.ssh/authorized_keys.csv
   [PASS] Exploit scenario blocked: ../../../../var/www/html/shell.csv
   
   ======================================================================
   TEST SUMMARY: 32/32 tests passed
   ======================================================================
   ```
   
   ## Complete Test Suite
   
   The following comprehensive test suite was used to validate the fix:
   
   ```python
   #!/usr/bin/env python3
   """
   Validation Test Suite for Apache IoTDB MCP Server Filename Handling
   Author: Mohammed Tanveer (threatpointer)
   Tests the sanitize_filename function to ensure proper filename validation.
   """
   
   import os
   import sys
   import tempfile
   import shutil
   from pathlib import Path
   
   # Add the src directory to the path
   sys.path.insert(0, os.path.join(os.path.dirname(__file__), "src"))
   
   # Import the sanitize_filename function
   from iotdb_mcp_server.server import sanitize_filename
   
   
   class TestResults:
       def __init__(self):
           self.passed = 0
           self.failed = 0
           self.errors = []
   
       def add_pass(self, test_name):
           self.passed += 1
           print(f"[PASS] {test_name}")
   
       def add_fail(self, test_name, reason):
           self.failed += 1
           self.errors.append((test_name, reason))
           print(f"[FAIL] {test_name}")
           print(f"  Reason: {reason}")
   
       def summary(self):
           total = self.passed + self.failed
           print("\n" + "=" * 70)
           print(f"TEST SUMMARY: {self.passed}/{total} tests passed")
           if self.failed > 0:
               print(f"\nFailed tests ({self.failed}):")
               for test_name, reason in self.errors:
                   print(f"  - {test_name}: {reason}")
           print("=" * 70)
           return self.failed == 0
   
   
   def test_path_traversal_attacks(temp_dir, results):
       """Test that path traversal attacks are blocked"""
   
       # Test 1: Simple path traversal
       try:
           sanitize_filename("../attack.csv", temp_dir)
           results.add_fail("Simple path traversal", "Should have raised 
ValueError")
       except ValueError as e:
           if "within export directory" in str(e) or "Invalid filename" in 
str(e):
               results.add_pass("Simple path traversal blocked")
           else:
               results.add_fail("Simple path traversal", f"Wrong error: {e}")
       except Exception as e:
           results.add_fail("Simple path traversal", f"Unexpected error: {e}")
   
       # Test 2: Multi-level path traversal
       try:
           sanitize_filename("../../../../etc/passwd.csv", temp_dir)
           results.add_fail("Multi-level path traversal", "Should have raised 
ValueError")
       except ValueError:
           results.add_pass("Multi-level path traversal blocked")
       except Exception as e:
           results.add_fail("Multi-level path traversal", f"Unexpected error: 
{e}")
   
       # Test 3: Path traversal with valid filename
       try:
           sanitize_filename("../../tmp/attack.csv", temp_dir)
           results.add_fail(
               "Path traversal with valid filename", "Should have raised 
ValueError"
           )
       except ValueError:
           results.add_pass("Path traversal with valid filename blocked")
       except Exception as e:
           results.add_fail("Path traversal with valid filename", f"Unexpected 
error: {e}")
   
       # Test 4: Absolute path attack
       try:
           if sys.platform == "win32":
               sanitize_filename("C:\\Windows\\System32\\attack.csv", temp_dir)
           else:
               sanitize_filename("/etc/passwd.csv", temp_dir)
           results.add_fail("Absolute path attack", "Should have raised 
ValueError")
       except ValueError:
           results.add_pass("Absolute path attack blocked")
       except Exception as e:
           results.add_fail("Absolute path attack", f"Unexpected error: {e}")
   
   
   def test_valid_filenames(temp_dir, results):
       """Test that valid filenames are accepted"""
   
       valid_filenames = [
           "export.csv",
           "data_2024.xlsx",
           "my-file.csv",
           "test_file_123.csv",
           "UPPERCASE.CSV",
           "mixed_Case-File.123.csv",
       ]
   
       for filename in valid_filenames:
           try:
               result = sanitize_filename(filename, temp_dir)
               # Check that the result is within the temp directory
               if 
os.path.realpath(result).startswith(os.path.realpath(temp_dir)):
                   results.add_pass(f"Valid filename accepted: {filename}")
               else:
                   results.add_fail(
                       f"Valid filename: {filename}", "Path escaped export 
directory"
                   )
           except Exception as e:
               results.add_fail(
                   f"Valid filename: {filename}", f"Unexpected rejection: {e}"
               )
   
   
   def test_invalid_characters(temp_dir, results):
       """Test that filenames with invalid characters are rejected"""
   
       invalid_filenames = [
           "file name.csv",  # space
           "file;name.csv",  # semicolon
           "file:name.csv",  # colon
           "file*name.csv",  # asterisk
           "file?name.csv",  # question mark
           "file|name.csv",  # pipe
           "file<name.csv",  # less than
           "file>name.csv",  # greater than
           'file"name.csv',  # quote
           "file\\name.csv",  # backslash
           "file/name.csv",  # forward slash
       ]
   
       for filename in invalid_filenames:
           try:
               sanitize_filename(filename, temp_dir)
               results.add_fail(
                   f"Invalid character test: {filename}", "Should have been 
rejected"
               )
           except ValueError as e:
               if "Invalid filename" in str(e):
                   results.add_pass(f"Invalid character rejected: {filename}")
               else:
                   results.add_fail(f"Invalid character: {filename}", f"Wrong 
error: {e}")
           except Exception as e:
               results.add_fail(f"Invalid character: {filename}", f"Unexpected 
error: {e}")
   
   
   def test_edge_cases(temp_dir, results):
       """Test edge cases"""
   
       # Test 1: Empty filename
       try:
           sanitize_filename("", temp_dir)
           results.add_fail("Empty filename", "Should have raised ValueError")
       except ValueError:
           results.add_pass("Empty filename rejected")
       except Exception as e:
           results.add_fail("Empty filename", f"Unexpected error: {e}")
   
       # Test 2: Just dots
       try:
           sanitize_filename("..", temp_dir)
           results.add_fail("Just dots", "Should have raised ValueError")
       except ValueError:
           results.add_pass("Just dots rejected")
       except Exception as e:
           results.add_fail("Just dots", f"Unexpected error: {e}")
   
       # Test 3: Single dot
       try:
           sanitize_filename(".", temp_dir)
           results.add_fail("Single dot", "Should have raised ValueError")
       except ValueError:
           results.add_pass("Single dot rejected")
       except Exception as e:
           results.add_fail("Single dot", f"Unexpected error: {e}")
   
       # Test 4: Hidden file (starts with dot) - should be valid
       try:
           result = sanitize_filename(".hidden.csv", temp_dir)
           if os.path.realpath(result).startswith(os.path.realpath(temp_dir)):
               results.add_pass("Hidden file accepted")
           else:
               results.add_fail("Hidden file", "Path escaped export directory")
       except Exception as e:
           results.add_fail("Hidden file", f"Unexpected error: {e}")
   
   
   def test_boundary_validation(temp_dir, results):
       """Test that files cannot escape the export directory"""
   
       # Create a subdirectory in temp
       subdir = os.path.join(temp_dir, "subdir")
       os.makedirs(subdir, exist_ok=True)
   
       # Test 1: File in parent directory
       try:
           # Try to write to parent of temp_dir
           parent_dir = os.path.dirname(temp_dir)
           filename = os.path.join("..", os.path.basename(parent_dir), 
"attack.csv")
           sanitize_filename(filename, temp_dir)
           results.add_fail("Escape to parent directory", "Should have been 
blocked")
       except ValueError:
           results.add_pass("Escape to parent directory blocked")
       except Exception as e:
           results.add_fail("Escape to parent directory", f"Unexpected error: 
{e}")
   
       # Test 2: Verify normalized path stays in boundary
       try:
           result = sanitize_filename("test.csv", temp_dir)
           result_real = os.path.realpath(result)
           temp_real = os.path.realpath(temp_dir)
   
           if result_real.startswith(temp_real):
               results.add_pass("Normalized path within boundary")
           else:
               results.add_fail("Normalized path boundary", f"Path escaped: 
{result_real}")
       except Exception as e:
           results.add_fail("Normalized path boundary", f"Error: {e}")
   
   
   def test_exploit_scenarios(temp_dir, results):
       """Test the actual exploit scenarios from the security report"""
   
       exploit_filenames = [
           "../../../tmp/test_traversal.csv",
           "../../../../Windows/Startup/backdoor.csv",
           "../../../etc/cron.d/backdoor.csv",
           "../../../../../../home/user/.ssh/authorized_keys.csv",
           "../../../../var/www/html/shell.csv",
       ]
   
       for filename in exploit_filenames:
           try:
               sanitize_filename(filename, temp_dir)
               results.add_fail(f"Exploit scenario: {filename}", "Exploit not 
blocked!")
           except ValueError:
               results.add_pass(f"Exploit scenario blocked: {filename}")
           except Exception as e:
               results.add_fail(f"Exploit scenario: {filename}", f"Unexpected 
error: {e}")
   
   
   def main():
       print("=" * 70)
       print("Apache IoTDB MCP Server: Fix filename validation in export 
functions Validation")
       print("Author: Mohammed Tanveer (threatpointer)")
       print("=" * 70 + "\n")
   
       results = TestResults()
   
       # Create a temporary directory for testing
       with tempfile.TemporaryDirectory() as temp_dir:
           print(f"Test export directory: {temp_dir}\n")
   
           print("Running Path Traversal Attack Tests...")
           test_path_traversal_attacks(temp_dir, results)
           print()
   
           print("Running Valid Filename Tests...")
           test_valid_filenames(temp_dir, results)
           print()
   
           print("Running Invalid Character Tests...")
           test_invalid_characters(temp_dir, results)
           print()
   
           print("Running Edge Case Tests...")
           test_edge_cases(temp_dir, results)
           print()
   
           print("Running Boundary Validation Tests...")
           test_boundary_validation(temp_dir, results)
           print()
   
           print("Running Exploit Scenario Tests...")
           test_exploit_scenarios(temp_dir, results)
           print()
   
       # Print summary
       success = results.summary()
   
       # Exit with appropriate code
       sys.exit(0 if success else 1)
   
   
   if __name__ == "__main__":
       main()
   ```
   
   ## Impact
   
   - **Backward Compatibility**: ✓ Maintained - Valid filenames continue to work
   - **Validation**: ✓ Significantly improved with multi-layer checks
   - **Performance**: ✓ Minimal overhead from validation checks
   - **Error Handling**: ✓ Clear error messages for invalid filenames
   
   ## Testing Instructions
   
   To verify the fix locally:
   
   ```bash
   # Run the comprehensive test suite
   python file_validation.py
   
   # Expected output: 32/32 tests passed
   ```
   
   ## Attribution
   
   **Author**: Mohammed Tanveer (threatpointer)
   
   ---
   
   *This fix has been thoroughly tested with 32 comprehensive test cases 
covering all edge cases and validation scenarios.*
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to