Optimization Best Practices

This guide presents best practices for using quActuary’s optimization features effectively, based on real-world use cases and performance testing.

General Principles

1. Start Simple, Optimize Incrementally

Begin with default settings and add optimizations based on profiling:

# Step 1: Baseline with defaults
results = model.simulate(n_simulations=10_000)
baseline_time = results.execution_time

# Step 2: Profile to identify bottlenecks
results = model.simulate(n_simulations=10_000, profile=True)
results.show_profile()

# Step 3: Apply targeted optimizations
if results.numerical_operations_percent > 50:
    results = model.simulate(n_simulations=10_000, use_jit=True)

# Step 4: Measure improvement
speedup = baseline_time / results.execution_time
print(f"Optimization speedup: {speedup:.1f}x")

2. Match Optimization to Problem Size

Use this decision matrix:

def select_optimizations(portfolio_size, n_simulations):
    config = {"vectorized": True}  # Always beneficial

    # JIT compilation threshold
    if portfolio_size > 100 or n_simulations > 50_000:
        config["use_jit"] = True

    # Parallel processing threshold
    if portfolio_size > 50 and n_simulations > 10_000:
        config["parallel"] = True

    # Memory management threshold
    memory_required_gb = (portfolio_size * n_simulations * 8) / 1e9
    if memory_required_gb > 4:
        config["memory_limit_gb"] = 4
        config["checkpoint_interval"] = n_simulations // 100

    # QMC for high-precision requirements
    if n_simulations > 100_000:
        config["use_qmc"] = True

    return config

# Apply intelligent defaults
config = select_optimizations(len(portfolio), 1_000_000)
results = model.simulate(n_simulations=1_000_000, **config)

3. Consider Total Time, Not Just Computation

Include setup and compilation time in benchmarks:

import time

# Include all overhead
total_start = time.time()

# First run includes JIT compilation
results = model.simulate(n_simulations=100_000, use_jit=True)

total_time = time.time() - total_start
computation_time = results.execution_time
overhead_time = total_time - computation_time

print(f"Total time: {total_time:.2f}s")
print(f"Computation: {computation_time:.2f}s ({computation_time/total_time*100:.1f}%)")
print(f"Overhead: {overhead_time:.2f}s ({overhead_time/total_time*100:.1f}%)")

JIT Compilation Best Practices

Design JIT-Friendly Functions

Write functions that compile efficiently:

# Good: JIT-friendly function
@numba.jit(nopython=True)
def calculate_losses_good(frequencies, severities, limits):
    n = len(frequencies)
    losses = np.zeros(n)
    for i in range(n):
        loss = frequencies[i] * severities[i]
        losses[i] = min(loss, limits[i])
    return losses

# Bad: JIT-unfriendly function
def calculate_losses_bad(policies):
    losses = []
    for policy in policies:  # Python objects don't compile
        loss = policy.frequency * policy.severity
        losses.append(min(loss, policy.limit))
    return losses

Use Type Annotations for Better Performance

Help the JIT compiler with type hints:

from numba import float64, int32

@numba.jit(
    float64[:](float64[:], float64[:], float64),
    nopython=True,
    cache=True
)
def apply_deductible(losses, deductibles, aggregate_deductible):
    """Apply per-occurrence and aggregate deductibles."""
    n = len(losses)
    net_losses = np.maximum(losses - deductibles, 0.0)
    total_retained = np.sum(losses - net_losses)

    if total_retained < aggregate_deductible:
        # Apply additional aggregate deductible
        remaining = aggregate_deductible - total_retained
        for i in range(n):
            if net_losses[i] > 0:
                reduction = min(net_losses[i], remaining)
                net_losses[i] -= reduction
                remaining -= reduction
                if remaining <= 0:
                    break

    return net_losses

Avoid JIT Compilation Pitfalls

Common issues and solutions:

# Pitfall 1: Python objects in JIT functions
# Bad
@numba.jit
def process_policies_bad(policies):  # List of objects won't compile
    return [p.calculate() for p in policies]

# Good
@numba.jit
def process_policies_good(values, limits):  # Use arrays instead
    return np.minimum(values, limits)

# Pitfall 2: Dynamic types
# Bad
@numba.jit
def dynamic_bad(x):
    if x > 0:
        return x  # returns float
    else:
        return "negative"  # returns string - type inconsistency!

# Good
@numba.jit
def dynamic_good(x):
    if x > 0:
        return x
    else:
        return 0.0  # consistent type

Parallel Processing Best Practices

Optimize Work Distribution

Balance work across processes:

def optimize_parallel_chunks(portfolio_size, n_simulations, n_workers):
    """Calculate optimal chunk sizes for parallel processing."""

    # Minimum chunk size to avoid overhead
    min_chunk = max(1000, n_simulations // (n_workers * 100))

    # Maximum chunk size for good load balancing
    max_chunk = n_simulations // (n_workers * 4)

    # Optimal chunk considering cache efficiency
    optimal_chunk = int(np.sqrt(n_simulations / n_workers) * 10)

    return np.clip(optimal_chunk, min_chunk, max_chunk)

# Use optimized chunks
chunk_size = optimize_parallel_chunks(
    len(portfolio),
    1_000_000,
    os.cpu_count()
)

results = model.simulate(
    n_simulations=1_000_000,
    parallel=True,
    chunk_size=chunk_size
)

Handle Platform Differences

Account for OS-specific behavior:

import platform
import multiprocessing as mp

def get_platform_config():
    """Platform-specific optimization settings."""
    system = platform.system()

    if system == "Windows":
        # Windows has higher process overhead
        return {
            "parallel_threshold": 100,  # Higher threshold
            "start_method": "spawn",
            "max_workers": min(4, mp.cpu_count())  # Limit workers
        }
    elif system == "Darwin":  # macOS
        # macOS has fork safety issues
        return {
            "parallel_threshold": 50,
            "start_method": "spawn",
            "max_workers": mp.cpu_count()
        }
    else:  # Linux
        # Linux has efficient forking
        return {
            "parallel_threshold": 25,
            "start_method": "fork",
            "max_workers": mp.cpu_count()
        }

# Apply platform-specific settings
config = get_platform_config()
if len(portfolio) > config["parallel_threshold"]:
    results = model.simulate(
        n_simulations=100_000,
        parallel=True,
        max_workers=config["max_workers"]
    )

Memory Management Best Practices

Estimate Memory Requirements

Calculate memory needs before running:

def estimate_memory_requirements(portfolio_size, n_simulations):
    """Estimate memory requirements in GB."""

    # Base memory per simulation result
    bytes_per_result = 8  # float64

    # Account for different arrays needed
    arrays_needed = {
        "frequencies": portfolio_size,
        "severities": portfolio_size,
        "losses": portfolio_size,
        "net_losses": portfolio_size,
        "aggregated": 1
    }

    total_elements = sum(arrays_needed.values()) * n_simulations
    base_memory = total_elements * bytes_per_result

    # Add overhead (temporary arrays, Python objects)
    overhead_factor = 2.5
    total_memory_bytes = base_memory * overhead_factor

    return total_memory_bytes / 1e9

# Check before running
required_gb = estimate_memory_requirements(len(portfolio), 10_000_000)
available_gb = psutil.virtual_memory().available / 1e9

if required_gb > available_gb * 0.8:
    print(f"Warning: Need {required_gb:.1f}GB, have {available_gb:.1f}GB")
    print("Enabling memory optimization...")
    results = model.simulate(
        n_simulations=10_000_000,
        memory_limit_gb=available_gb * 0.7,
        checkpoint_interval=100_000
    )
else:
    results = model.simulate(n_simulations=10_000_000)

Implement Streaming for Large Datasets

Process data in chunks to manage memory:

def simulate_streaming(model, n_simulations, chunk_size=100_000):
    """Run simulation in memory-efficient chunks."""

    all_results = []
    n_chunks = (n_simulations + chunk_size - 1) // chunk_size

    for i in range(n_chunks):
        start_idx = i * chunk_size
        end_idx = min((i + 1) * chunk_size, n_simulations)
        chunk_sims = end_idx - start_idx

        # Process chunk
        chunk_results = model.simulate(
            n_simulations=chunk_sims,
            progress_bar=False  # Avoid multiple progress bars
        )

        # Extract only essential statistics
        all_results.append({
            'mean': chunk_results.mean(),
            'std': chunk_results.std(),
            'percentiles': chunk_results.percentiles([0.95, 0.99])
        })

        # Force garbage collection between chunks
        import gc
        gc.collect()

    # Combine results
    return combine_chunk_results(all_results)

QMC Best Practices

Choose the Right QMC Engine

Select based on problem characteristics:

def select_qmc_engine(portfolio):
    """Choose QMC engine based on portfolio characteristics."""

    # Calculate effective dimension
    n_random_variables = len(portfolio) * 2  # frequency + severity
    has_correlation = portfolio.correlation_matrix is not None

    if n_random_variables <= 10:
        # Low dimension - Halton is efficient
        return 'halton'
    elif n_random_variables <= 50 and not has_correlation:
        # Medium dimension without correlation - Sobol
        return 'sobol'
    elif has_correlation:
        # Correlation requires special handling
        return 'sobol'  # With scrambling
    else:
        # High dimension - Sobol with dimension reduction
        return 'sobol'

# Apply intelligent QMC selection
qmc_engine = select_qmc_engine(portfolio)
results = model.simulate(
    n_simulations=50_000,
    use_qmc=True,
    qmc_engine=qmc_engine
)

Monitor QMC Convergence

Track convergence to optimize simulation count:

def monitor_qmc_convergence(model, target_precision=0.001):
    """Run QMC with convergence monitoring."""

    n_base = 1000
    results_history = []

    for power in range(10):  # Up to 2^10 * 1000 simulations
        n_sims = n_base * (2 ** power)

        results = model.simulate(
            n_simulations=n_sims,
            use_qmc=True,
            random_state=42  # Consistent sequence
        )

        results_history.append({
            'n': n_sims,
            'mean': results.mean(),
            'std_error': results.standard_error()
        })

        # Check convergence
        if len(results_history) > 1:
            prev_mean = results_history[-2]['mean']
            curr_mean = results_history[-1]['mean']
            relative_change = abs(curr_mean - prev_mean) / abs(prev_mean)

            if relative_change < target_precision:
                print(f"Converged at {n_sims} simulations")
                break

    return results, results_history

Production Deployment

Configuration Management

Use configuration files for production:

# config/optimization.yaml
development:
  use_jit: false
  parallel: false
  n_simulations: 1000
  random_state: 42

staging:
  use_jit: true
  parallel: true
  max_workers: 4
  n_simulations: 100_000
  memory_limit_gb: 8

production:
  use_jit: true
  parallel: true
  max_workers: null  # Use all cores
  n_simulations: 1_000_000
  memory_limit_gb: null  # Auto-detect
  use_qmc: true
  checkpoint_interval: 50_000

Load and apply configuration:

import yaml
import os

def load_optimization_config():
    env = os.environ.get('QUACTUARY_ENV', 'development')

    with open('config/optimization.yaml', 'r') as f:
        config = yaml.safe_load(f)

    return config.get(env, config['development'])

# Use environment-specific configuration
config = load_optimization_config()
results = model.simulate(**config)

Error Handling and Recovery

Implement robust error handling:

import logging
from contextlib import contextmanager

@contextmanager
def safe_simulation(model, n_simulations, **kwargs):
    """Run simulation with comprehensive error handling."""

    logger = logging.getLogger(__name__)
    checkpoint_file = f"checkpoint_{model.id}_{n_simulations}.pkl"

    try:
        # Try optimal configuration
        logger.info(f"Starting simulation with {n_simulations} paths")
        yield model.simulate(n_simulations=n_simulations, **kwargs)

    except MemoryError:
        logger.warning("Memory error, switching to streaming mode")
        yield simulate_streaming(model, n_simulations)

    except mp.ProcessError:
        logger.warning("Parallel processing failed, using single process")
        kwargs['parallel'] = False
        yield model.simulate(n_simulations=n_simulations, **kwargs)

    except KeyboardInterrupt:
        logger.info("Simulation interrupted, saving checkpoint")
        if os.path.exists(checkpoint_file):
            logger.info(f"Checkpoint saved to {checkpoint_file}")
        raise

    finally:
        # Cleanup
        if os.path.exists(checkpoint_file):
            os.remove(checkpoint_file)

# Use safe simulation
with safe_simulation(model, 1_000_000, use_jit=True, parallel=True) as results:
    print(f"Simulation completed: {results.execution_time:.2f}s")

Performance Monitoring

Track performance metrics in production:

class PerformanceTracker:
    def __init__(self):
        self.metrics = []

    def track_simulation(self, model, config):
        """Track simulation performance metrics."""

        start_time = time.time()
        start_memory = psutil.Process().memory_info().rss / 1e9

        results = model.simulate(**config)

        end_time = time.time()
        peak_memory = psutil.Process().memory_info().rss / 1e9

        metrics = {
            'timestamp': datetime.now().isoformat(),
            'portfolio_size': len(model.portfolio),
            'n_simulations': config['n_simulations'],
            'execution_time': end_time - start_time,
            'memory_used_gb': peak_memory - start_memory,
            'simulations_per_second': config['n_simulations'] / (end_time - start_time),
            'optimizations': {
                'jit': config.get('use_jit', False),
                'parallel': config.get('parallel', False),
                'qmc': config.get('use_qmc', False)
            }
        }

        self.metrics.append(metrics)
        self.save_metrics()

        return results

    def save_metrics(self):
        """Save metrics for analysis."""
        with open('performance_metrics.json', 'w') as f:
            json.dump(self.metrics, f, indent=2)

# Track production performance
tracker = PerformanceTracker()
results = tracker.track_simulation(model, production_config)

Next Steps