hainenber commented on code in PR #37982:
URL: https://github.com/apache/superset/pull/37982#discussion_r2808547144


##########
tests/integration_tests/utils/encrypt_tests.py:
##########
@@ -86,3 +87,79 @@ def test_ensure_encrypted_field_factory_is_used(self):
                         " was not created using the"
                         " encrypted_field_factory"
                     )
+
+    def test_lazy_key_resolution(self):
+        """
+        Verify that the encryption key is resolved lazily at runtime,
+        not captured statically at field creation time.
+        """
+        original_key = self.app.config["SECRET_KEY"]
+        field = encrypted_field_factory.create(String(1024))
+
+        # Key should initially resolve to the current SECRET_KEY
+        assert callable(field.key)
+        assert field.key() == original_key
+
+        # Simulate a key change (e.g. config override, env var update)
+        new_key = "ROTATED_TEST_KEY_12345"
+        self.app.config["SECRET_KEY"] = new_key
+
+        # The field's key should now resolve to the new value
+        assert field.key() == new_key
+
+        # Restore original key
+        self.app.config["SECRET_KEY"] = original_key
+        assert field.key() == original_key
+
+    def test_secret_key_rotation(self):
+        """
+        End-to-end test: encrypt data with KEY_A, rotate to KEY_B,
+        run re-encryption, and verify data is accessible under KEY_B.
+        """
+        from sqlalchemy.engine import make_url
+
+        key_a = self.app.config["SECRET_KEY"]
+        key_b = "NEW_ROTATION_TEST_KEY_67890"
+        test_value = "super_secret_password_123"
+
+        field = encrypted_field_factory.create(String(1024))
+        dialect = make_url("sqlite://").get_dialect()
+
+        # Step 1: Encrypt with KEY_A
+        encrypted_a = field.process_bind_param(test_value, dialect)
+        assert encrypted_a is not None
+        assert encrypted_a != test_value
+
+        # Step 2: Verify decryption with KEY_A works
+        decrypted = field.process_result_value(encrypted_a, dialect)
+        assert decrypted == test_value
+
+        # Step 3: Rotate key to KEY_B
+        self.app.config["SECRET_KEY"] = key_b
+
+        # Step 4: Re-encrypt with KEY_B (simulating SecretsMigrator logic)
+        # Decrypt using previous key
+        previous_field = EncryptedType(type_in=field.underlying_type, 
key=key_a)
+        decrypted_with_prev = previous_field.process_result_value(encrypted_a, 
dialect)
+        assert decrypted_with_prev == test_value
+
+        # Re-encrypt using current key (KEY_B, resolved via lambda)
+        encrypted_b = field.process_bind_param(decrypted_with_prev, dialect)
+        assert encrypted_b is not None
+        assert encrypted_b != encrypted_a  # Different ciphertext
+
+        # Step 5: Verify decryption with KEY_B works
+        decrypted_b = field.process_result_value(encrypted_b, dialect)
+        assert decrypted_b == test_value
+
+        # Step 6: Verify KEY_A can no longer decrypt the new ciphertext
+        self.app.config["SECRET_KEY"] = key_a
+        try:
+            bad_decrypt = field.process_result_value(encrypted_b, dialect)
+            # If decryption doesn't raise, value should be garbage
+            assert bad_decrypt != test_value
+        except (ValueError, Exception):
+            pass  # Expected: old key cannot decrypt new ciphertext

Review Comment:
   there should be an assertion here for the thrown error to ensure that indeed 
it's the decryption issue



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to