Source code for django_postgres_anon.context_managers

"""Context managers for dynamic anonymization role switching"""

import contextlib
import logging
from typing import Any, Dict, Generator, Optional

from django.db import connection

from django_postgres_anon.config import get_anon_setting
from django_postgres_anon.models import MaskedRole
from django_postgres_anon.utils import reset_role, switch_to_role

logger = logging.getLogger(__name__)


[docs] @contextlib.contextmanager def anonymized_data(role_name: Optional[str] = None, auto_create: bool = True) -> Generator[None, None, None]: """ Context manager for temporarily switching to an anonymized database role. This allows queries within the context to see anonymized data instead of real data, assuming the PostgreSQL Anonymizer extension is properly configured with masking rules. Args: role_name: Name of the masked role to use. Defaults to 'masked_reader'. auto_create: Whether to automatically create the role if it doesn't exist. Example: >>> with anonymized_data(): ... users = User.objects.all() # Returns anonymized user data >>> with anonymized_data('custom_masked_role'): ... sensitive_data = SensitiveModel.objects.all() Raises: RuntimeError: If the role doesn't exist and auto_create is False Exception: If role switching fails """ if role_name is None: role_name = get_anon_setting("ANONYMIZED_DATA_ROLE") # Initialize state tracking state = _initialize_context_state() try: # Setup: Store original state and switch role state["original_isolation_level"] = _capture_transaction_state() state["role_switched"] = _setup_masked_role(role_name, auto_create) yield except Exception as e: logger.error(f"Error in anonymized_data context: {e}") raise finally: # Cleanup: Restore original state _restore_original_state(state)
def _initialize_context_state() -> Dict[str, Any]: """Initialize state tracking for the context manager.""" connection.ensure_connection() connection.get_autocommit() # Ensure connection is established return {"original_isolation_level": None, "role_switched": False} def _capture_transaction_state() -> Optional[str]: """Capture current transaction isolation level if in a transaction.""" if not connection.in_atomic_block: return None with connection.cursor() as cursor: cursor.execute("SHOW transaction_isolation") result = cursor.fetchone() return result[0] if result else None def _setup_masked_role(role_name: str, auto_create: bool) -> bool: """Switch to masked role and handle auto-creation.""" # Update database records BEFORE switching roles (while we still have permissions) if auto_create: _update_masked_role_record(role_name) # Switch to the masked role if not switch_to_role(role_name, auto_create=auto_create): raise RuntimeError(f"Failed to switch to masked role: {role_name}") # Verify the switch was successful _verify_role_switch(role_name) return True def _update_masked_role_record(role_name: str) -> None: """Create or update MaskedRole record for tracking.""" try: # First check if record already exists and is applied existing_role = MaskedRole.objects.filter(role_name=role_name, is_applied=True).first() if existing_role: # Already exists and applied, no need to update return # pragma: no cover # Create or update the record masked_role, created = MaskedRole.objects.get_or_create( role_name=role_name, defaults={"is_applied": True, "description": "Auto-created role for anonymized data access"}, ) if not created and not masked_role.is_applied: masked_role.is_applied = True masked_role.save() except Exception as e: # Don't fail if we can't update the record - the role switching is more important logger.warning(f"Failed to update MaskedRole record for {role_name}: {e}") def _verify_role_switch(role_name: str) -> None: """Verify that the role switch was successful.""" with connection.cursor() as cursor: cursor.execute("SELECT CURRENT_USER") current_user = cursor.fetchone()[0] logger.debug(f"Switched to database user: {current_user}") def _restore_original_state(state: Dict[str, Any]) -> None: """Restore the original database connection state.""" try: # Reset role if it was switched if state["role_switched"]: reset_role() # Restore transaction isolation level if needed if state["original_isolation_level"] and connection.in_atomic_block: _restore_isolation_level(state["original_isolation_level"]) except Exception as e: logger.error(f"Error restoring original state: {e}") # Don't raise here as it might mask the original exception def _restore_isolation_level(isolation_level: str) -> None: """Restore the original transaction isolation level.""" with connection.cursor() as cursor: cursor.execute(f"SET transaction_isolation = '{isolation_level}'")