Skip to content

Infrastructure as Code with Peakhour

This guide shows you how to manage Peakhour configurations using Infrastructure as Code (IaC) principles, enabling version control, automated deployments, and consistent environment management across your development lifecycle.

Before you begin: Understand API key management and have basic knowledge of Git workflows and CI/CD pipelines.

Understanding Peakhour's Configuration Management

Peakhour provides a comprehensive configuration state system that enables Git-like workflow for infrastructure changes with commit/rollback capabilities and version history.

Configuration State Architecture

State Management Features:

  • Version Control: Track all configuration changes with commit messages
  • Rollback Capability: Revert to any previous configuration version
  • Diff Visualization: See exactly what changed between versions
  • Atomic Operations: All changes applied together or not at all
  • History Tracking: Complete audit trail of configuration modifications

Configuration Components:

ConfigStateModel = {
  rules: Record<string, RuleAdd>,           // Firewall and processing rules
  policies: Record<string, PolicyAdd>,      // Edge Access policies  
  applications: Record<string, ApplicationAdd>, // Application definitions
  auth_config: AuthServerConfigAdd,        // Authentication settings
  lists: Record<string, EdgeAccessListAdd>, // IP and string lists
  metadata: Record<string, unknown>        // Custom metadata
}

Set Up Configuration Repository

Initialize Configuration Repository

Create a Git repository for your Peakhour configurations:

# Create new repository
mkdir peakhour-config
cd peakhour-config
git init

# Create directory structure
mkdir -p environments/{dev,staging,prod}
mkdir -p modules/{security,access-control,performance}
mkdir -p scripts/{deploy,validate,rollback}

# Initialize configuration files
touch environments/dev/config.json
touch environments/staging/config.json  
touch environments/prod/config.json

Configuration File Structure

Environment-Specific Configuration (environments/prod/config.json):

{
  "domain": "example.com",
  "environment": "production",
  "rules": {
    "security_baseline": {
      "name": "Security Baseline",
      "description": "Core security rules for production",
      "wirefilter_rule": "ip.geoip.country in {\"CN\", \"RU\"} and not (ip.src in $trusted_partners)"
    },
    "api_protection": {
      "name": "API Rate Protection", 
      "description": "Protect API endpoints from abuse",
      "wirefilter_rule": "starts_with(http.request.uri.path, \"/api/\") and rate(1m) > 100"
    }
  },
  "policies": {
    "admin_access": {
      "name": "Admin Access Policy",
      "description": "Secure admin area access",
      "action": "logingate",
      "rule_ids": ["admin_ip_whitelist", "business_hours"]
    }
  },
  "lists": {
    "trusted_partners": {
      "name": "Trusted Partner IPs",
      "type": "ip", 
      "ips": ["203.0.113.0/24", "198.51.100.0/24"]
    }
  }
}

Module-Based Configuration (modules/security/waf-baseline.json):

{
  "name": "WAF Security Baseline",
  "description": "Standard WAF configuration for all environments",
  "rules": {
    "sql_injection_block": {
      "name": "Block SQL Injection",
      "description": "Detect and block SQL injection attempts", 
      "wirefilter_rule": "waf.matched_rule.tags contains \"sql\" and waf.matched_rule.severity eq \"CRITICAL\""
    },
    "xss_protection": {
      "name": "XSS Protection",
      "description": "Block cross-site scripting attempts",
      "wirefilter_rule": "waf.matched_rule.tags contains \"xss\" and waf.matched_rule.severity in {\"CRITICAL\", \"ERROR\"}"
    }
  }
}

Environment Variables and Secrets

.env.example:

# API Configuration
PEAKHOUR_API_TOKEN=your_api_token_here
PEAKHOUR_API_BASE_URL=https://api.peakhour.io

# Environment Settings  
ENVIRONMENT=production
DOMAIN=example.com

# Deployment Settings
AUTO_COMMIT=true
COMMIT_MESSAGE_PREFIX="[automated]"
ROLLBACK_ON_ERROR=true

Secrets Management (using GitHub Secrets, AWS Parameter Store, etc.):

# Store API tokens securely
# GitHub Actions: Repository Settings → Secrets
PEAKHOUR_API_TOKEN_DEV
PEAKHOUR_API_TOKEN_STAGING  
PEAKHOUR_API_TOKEN_PROD

Create Deployment Scripts

Configuration Deployment Script

View script: scripts/deploy/deploy.py

```python

!/usr/bin/env python3

import os import json import requests import sys from datetime import datetime from typing import Dict, Any

class PeakhourDeployer: def init(self, api_token: str, domain: str, environment: str): self.api_token = api_token self.domain = domain self.environment = environment self.base_url = "https://api.peakhour.io"

def load_config(self, config_path: str) -> Dict[str, Any]:
    """Load configuration from JSON file"""
    with open(config_path, 'r') as f:
        return json.load(f)

def merge_modules(self, base_config: Dict[str, Any], module_paths: list) -> Dict[str, Any]:
    """Merge module configurations into base config"""
    merged_config = base_config.copy()

    for module_path in module_paths:
        with open(module_path, 'r') as f:
            module_config = json.load(f)

        # Merge rules
        if 'rules' in module_config:
            merged_config.setdefault('rules', {}).update(module_config['rules'])

        # Merge other components
        for key in ['policies', 'applications', 'lists']:
            if key in module_config:
                merged_config.setdefault(key, {}).update(module_config[key])

    return merged_config

def validate_config(self, config: Dict[str, Any]) -> bool:
    """Validate configuration before deployment"""
    required_fields = ['rules', 'domain']

    for field in required_fields:
        if field not in config:
            print(f"❌ Missing required field: {field}")
            return False

    # Validate rule syntax
    for rule_name, rule_config in config.get('rules', {}).items():
        if not rule_config.get('wirefilter_rule'):
            print(f"❌ Rule '{rule_name}' missing wirefilter_rule")
            return False

    print("✅ Configuration validation passed")
    return True

def get_current_state(self) -> Dict[str, Any]:
    """Get current configuration state from Peakhour"""
    url = f"{self.base_url}/domains/{self.domain}/edge-access/config/state"
    headers = {"Authorization": f"Bearer {self.api_token}"}

    response = requests.get(url, headers=headers)
    response.raise_for_status()

    return response.json()

def apply_config(self, config: Dict[str, Any], commit_message: str = None) -> Dict[str, Any]:
    """Apply configuration to Peakhour"""
    if not commit_message:
        commit_message = f"Deploy {self.environment} configuration - {datetime.now().isoformat()}"

    url = f"{self.base_url}/domains/{self.domain}/edge-access/config/apply"
    headers = {
        "Authorization": f"Bearer {self.api_token}",
        "Content-Type": "application/json"
    }

    payload = {
        "state": config,
        "message": commit_message
    }

    response = requests.post(url, json=payload, headers=headers)
    response.raise_for_status()

    return response.json()

def commit_changes(self, message: str) -> Dict[str, Any]:
    """Commit pending configuration changes"""
    url = f"{self.base_url}/domains/{self.domain}/edge-access/config/commit"
    headers = {
        "Authorization": f"Bearer {self.api_token}",
        "Content-Type": "application/json"
    }

    payload = {"message": message}

    response = requests.post(url, json=payload, headers=headers)
    response.raise_for_status()

    return response.json()

def get_version_history(self) -> list:
    """Get configuration version history"""
    url = f"{self.base_url}/domains/{self.domain}/edge-access/config/history"
    headers = {"Authorization": f"Bearer {self.api_token}"}

    response = requests.get(url, headers=headers)
    response.raise_for_status()

    return response.json()['history']

def rollback_to_version(self, version: int, commit_message: str = None) -> Dict[str, Any]:
    """Rollback to specific configuration version"""
    if not commit_message:
        commit_message = f"Rollback to version {version} - {datetime.now().isoformat()}"

    url = f"{self.base_url}/domains/{self.domain}/edge-access/config/rollback"
    headers = {
        "Authorization": f"Bearer {self.api_token}",
        "Content-Type": "application/json"
    }

    payload = {
        "version": version,
        "message": commit_message,
        "commit": True
    }

    response = requests.post(url, json=payload, headers=headers)
    response.raise_for_status()

    return response.json()

def main(): # Load environment variables api_token = os.getenv('PEAKHOUR_API_TOKEN') domain = os.getenv('DOMAIN') environment = os.getenv('ENVIRONMENT', 'dev')

if not api_token or not domain:
    print("❌ Missing required environment variables: PEAKHOUR_API_TOKEN, DOMAIN")
    sys.exit(1)

deployer = PeakhourDeployer(api_token, domain, environment)

# Load base configuration
config_path = f"environments/{environment}/config.json"
try:
    base_config = deployer.load_config(config_path)
    print(f"✅ Loaded base configuration from {config_path}")
except FileNotFoundError:
    print(f"❌ Configuration file not found: {config_path}")
    sys.exit(1)

# Load and merge modules
module_paths = [
    "modules/security/waf-baseline.json",
    "modules/performance/caching-rules.json"
]

available_modules = [path for path in module_paths if os.path.exists(path)]
if available_modules:
    merged_config = deployer.merge_modules(base_config, available_modules)
    print(f"✅ Merged {len(available_modules)} modules")
else:
    merged_config = base_config

# Validate configuration
if not deployer.validate_config(merged_config):
    print("❌ Configuration validation failed")
    sys.exit(1)

# Apply configuration
try:
    commit_msg = os.getenv('COMMIT_MESSAGE', f"Deploy {environment} configuration")
    result = deployer.apply_config(merged_config, commit_msg)
    print(f"✅ Configuration deployed successfully")
    print(f"   Version: {result.get('version', 'N/A')}")
    print(f"   Timestamp: {result.get('timestamp', 'N/A')}")

except requests.exceptions.RequestException as e:
    print(f"❌ Deployment failed: {e}")

    # Rollback on error if enabled
    if os.getenv('ROLLBACK_ON_ERROR', 'false').lower() == 'true':
        try:
            history = deployer.get_version_history()
            if len(history) > 1:  # More than just current version
                previous_version = history[1]['version']  # Second entry is previous
                print(f"🔄 Rolling back to version {previous_version}")
                rollback_result = deployer.rollback_to_version(
                    previous_version, 
                    f"Auto-rollback due to deployment error: {str(e)}"
                )
                print(f"✅ Rollback completed: {rollback_result}")
            else:
                print("❌ No previous version available for rollback")
        except Exception as rollback_error:
            print(f"❌ Rollback failed: {rollback_error}")

    sys.exit(1)

if __name__ == "__main__":
    main()
```

Configuration Validation Script

View script: scripts/validate/validate.py

```python

!/usr/bin/env python3

import json import re import sys from typing import Dict, Any, List

class ConfigValidator: def init(self): self.errors = [] self.warnings = []

def validate_wirefilter_syntax(self, rule: str) -> bool:
    """Basic wirefilter syntax validation"""
    # Check for common syntax errors
    if rule.count('(') != rule.count(')'):
        return False

    if rule.count('{') != rule.count('}'):
        return False

    if rule.count('"') % 2 != 0:
        return False

    # Check for valid operators
    valid_operators = [
        'eq', '==', 'ne', '!=', 'gt', '>', 'lt', '<', 'ge', '>=', 'le', '<=',
        'contains', 'starts_with', 'ends_with', 'matches', 'in', 'not in',
        'and', 'or', 'not'
    ]

    return True  # More sophisticated validation would check actual syntax

def validate_rule(self, rule_name: str, rule_config: Dict[str, Any]) -> List[str]:
    """Validate individual rule configuration"""
    errors = []

    # Required fields
    if not rule_config.get('name'):
        errors.append(f"Rule '{rule_name}' missing 'name' field")

    if not rule_config.get('wirefilter_rule'):
        errors.append(f"Rule '{rule_name}' missing 'wirefilter_rule' field")
    else:
        if not self.validate_wirefilter_syntax(rule_config['wirefilter_rule']):
            errors.append(f"Rule '{rule_name}' has invalid wirefilter syntax")

    # Optional but recommended fields
    if not rule_config.get('description'):
        self.warnings.append(f"Rule '{rule_name}' missing description")

    return errors

def validate_policy(self, policy_name: str, policy_config: Dict[str, Any]) -> List[str]:
    """Validate policy configuration"""
    errors = []

    required_fields = ['name', 'action']
    for field in required_fields:
        if not policy_config.get(field):
            errors.append(f"Policy '{policy_name}' missing '{field}' field")

    # Validate action values
    valid_actions = ['allow', 'block', 'bypass', 'serviceauth', 'logingate']
    if policy_config.get('action') not in valid_actions:
        errors.append(f"Policy '{policy_name}' has invalid action. Valid actions: {valid_actions}")

    return errors

def validate_list(self, list_name: str, list_config: Dict[str, Any]) -> List[str]:
    """Validate list configuration"""
    errors = []

    if not list_config.get('name'):
        errors.append(f"List '{list_name}' missing 'name' field")

    if not list_config.get('type'):
        errors.append(f"List '{list_name}' missing 'type' field")
    else:
        valid_types = ['ip', 'string', 'int']
        if list_config['type'] not in valid_types:
            errors.append(f"List '{list_name}' has invalid type. Valid types: {valid_types}")

    return errors

def validate_ip_addresses(self, ips: List[str]) -> List[str]:
    """Validate IP addresses and CIDR blocks"""
    errors = []

    for ip in ips:
        # Basic IP/CIDR validation
        if not re.match(r'^(\d{1,3}\.){3}\d{1,3}(/\d{1,2})?$', ip):
            errors.append(f"Invalid IP address or CIDR block: {ip}")

    return errors

def validate_config(self, config: Dict[str, Any]) -> bool:
    """Validate complete configuration"""
    self.errors = []
    self.warnings = []

    # Validate rules
    for rule_name, rule_config in config.get('rules', {}).items():
        self.errors.extend(self.validate_rule(rule_name, rule_config))

    # Validate policies
    for policy_name, policy_config in config.get('policies', {}).items():
        self.errors.extend(self.validate_policy(policy_name, policy_config))

    # Validate lists
    for list_name, list_config in config.get('lists', {}).items():
        self.errors.extend(self.validate_list(list_name, list_config))

        # Validate IP addresses if it's an IP list
        if list_config.get('type') == 'ip' and 'ips' in list_config:
            self.errors.extend(self.validate_ip_addresses(list_config['ips']))

    return len(self.errors) == 0

def print_results(self):
    """Print validation results"""
    if self.errors:
        print("❌ Validation Errors:")
        for error in self.errors:
            print(f"   • {error}")

    if self.warnings:
        print("⚠️ Validation Warnings:")
        for warning in self.warnings:
            print(f"   • {warning}")

    if not self.errors and not self.warnings:
        print("✅ Configuration validation passed with no issues")
    elif not self.errors:
        print("✅ Configuration validation passed with warnings")

def main(): if len(sys.argv) != 2: print("Usage: python validate.py ") sys.exit(1)

config_file = sys.argv[1]

try:
    with open(config_file, 'r') as f:
        config = json.load(f)
except FileNotFoundError:
    print(f"❌ Configuration file not found: {config_file}")
    sys.exit(1)
except json.JSONDecodeError as e:
    print(f"❌ Invalid JSON in configuration file: {e}")
    sys.exit(1)

validator = ConfigValidator()
is_valid = validator.validate_config(config)
validator.print_results()

if not is_valid:
    sys.exit(1)

if __name__ == "__main__":
    main()
```

CI/CD Pipeline Integration

GitHub Actions Workflow

View pipeline: .github/workflows/deploy.yml

```yaml name: Deploy Peakhour Configuration

on: push: branches: [main, staging, dev] pull_request: branches: [main]

env: PEAKHOUR_API_BASE_URL: https://api.peakhour.io

jobs: validate: runs-on: ubuntu-latest steps:

- uses: actions/checkout@v3


- name: Set up Python
  uses: actions/setup-python@v4
  with:
    python-version: '3.9'


- name: Install dependencies
  run: |
    pip install requests


- name: Validate Configuration
  run: |
    python scripts/validate/validate.py environments/dev/config.json
    python scripts/validate/validate.py environments/staging/config.json
    python scripts/validate/validate.py environments/prod/config.json

deploy-dev: needs: validate runs-on: ubuntu-latest if: github.ref == 'refs/heads/dev' environment: development

steps:

- uses: actions/checkout@v3


- name: Set up Python
  uses: actions/setup-python@v4
  with:
    python-version: '3.9'


- name: Install dependencies
  run: pip install requests


- name: Deploy to Development
  env:
    PEAKHOUR_API_TOKEN: ${{ secrets.PEAKHOUR_API_TOKEN_DEV }}
    DOMAIN: dev.example.com
    ENVIRONMENT: dev
    COMMIT_MESSAGE: "[dev] ${{ github.event.head_commit.message }}"
  run: python scripts/deploy/deploy.py

deploy-staging: needs: validate runs-on: ubuntu-latest if: github.ref == 'refs/heads/staging' environment: staging

steps:

- uses: actions/checkout@v3


- name: Set up Python
  uses: actions/setup-python@v4
  with:
    python-version: '3.9'


- name: Install dependencies
  run: pip install requests


- name: Deploy to Staging
  env:
    PEAKHOUR_API_TOKEN: ${{ secrets.PEAKHOUR_API_TOKEN_STAGING }}
    DOMAIN: staging.example.com
    ENVIRONMENT: staging
    COMMIT_MESSAGE: "[staging] ${{ github.event.head_commit.message }}"
    ROLLBACK_ON_ERROR: true
  run: python scripts/deploy/deploy.py

deploy-production: needs: validate runs-on: ubuntu-latest if: github.ref == 'refs/heads/main' environment: production

steps:

- uses: actions/checkout@v3


- name: Set up Python
  uses: actions/setup-python@v4
  with:
    python-version: '3.9'


- name: Install dependencies
  run: pip install requests


- name: Deploy to Production
  env:
    PEAKHOUR_API_TOKEN: ${{ secrets.PEAKHOUR_API_TOKEN_PROD }}
    DOMAIN: example.com
    ENVIRONMENT: prod
    COMMIT_MESSAGE: "[prod] ${{ github.event.head_commit.message }}"
    ROLLBACK_ON_ERROR: true
  run: python scripts/deploy/deploy.py


- name: Health Check
  env:
    PEAKHOUR_API_TOKEN: ${{ secrets.PEAKHOUR_API_TOKEN_PROD }}
    DOMAIN: example.com
  run: |
    # Wait for deployment to propagate
    sleep 30

    # Basic health check
    curl -f "https://example.com/health" || exit 1

    # Check Peakhour API health
    curl -f "https://api.peakhour.io/domains/example.com/health" \

      -H "Authorization: Bearer $PEAKHOUR_API_TOKEN" || exit 1

rollback-production: runs-on: ubuntu-latest if: failure() && github.ref == 'refs/heads/main' needs: deploy-production

steps:

- uses: actions/checkout@v3


- name: Set up Python
  uses: actions/setup-python@v4
  with:
    python-version: '3.9'


- name: Install dependencies
  run: pip install requests


- name: Rollback Production
  env:
    PEAKHOUR_API_TOKEN: ${{ secrets.PEAKHOUR_API_TOKEN_PROD }}
    DOMAIN: example.com
  run: |
    python -c "
    import requests
    import os

    api_token = os.getenv('PEAKHOUR_API_TOKEN')
    domain = os.getenv('DOMAIN')

    # Get version history
    url = f'https://api.peakhour.io/domains/{domain}/edge-access/config/history'
    headers = {'Authorization': f'Bearer {api_token}'}

    response = requests.get(url, headers=headers)
    history = response.json()['history']

    if len(history) > 1:
        previous_version = history[1]['version']

        # Rollback to previous version
        rollback_url = f'https://api.peakhour.io/domains/{domain}/edge-access/config/rollback'
        payload = {
            'version': previous_version,
            'message': 'Emergency rollback due to deployment failure',
            'commit': True
        }

        rollback_response = requests.post(rollback_url, json=payload, headers=headers)
        rollback_response.raise_for_status()

        print(f'✅ Rolled back to version {previous_version}')
    else:
        print('❌ No previous version available for rollback')
        "
```

GitLab CI/CD Pipeline

View pipeline: .gitlab-ci.yml

```yaml stages:

  • validate
  • deploy-dev

  • deploy-staging

  • deploy-prod

variables: PEAKHOUR_API_BASE_URL: https://api.peakhour.io

validate-config: stage: validate image: python:3.9 before_script:

- pip install requests

script:

- python scripts/validate/validate.py environments/dev/config.json
- python scripts/validate/validate.py environments/staging/config.json

- python scripts/validate/validate.py environments/prod/config.json

only:

- merge_requests
- main

- staging
- dev

deploy-development: stage: deploy-dev image: python:3.9 before_script:

- pip install requests

script:

- python scripts/deploy/deploy.py

environment: name: development url: https://dev.example.com variables: DOMAIN: dev.example.com ENVIRONMENT: dev COMMIT_MESSAGE: "[dev] ${CI_COMMIT_MESSAGE}" only:

- dev

deploy-staging: stage: deploy-staging image: python:3.9 before_script:

- pip install requests

script:

- python scripts/deploy/deploy.py

environment: name: staging url: https://staging.example.com variables: DOMAIN: staging.example.com ENVIRONMENT: staging COMMIT_MESSAGE: "[staging] ${CI_COMMIT_MESSAGE}" ROLLBACK_ON_ERROR: "true" only:

- staging

deploy-production: stage: deploy-prod image: python:3.9 before_script:

- pip install requests

script:

- python scripts/deploy/deploy.py

environment: name: production url: https://example.com variables: DOMAIN: example.com ENVIRONMENT: prod COMMIT_MESSAGE: "[prod] ${CI_COMMIT_MESSAGE}" ROLLBACK_ON_ERROR: "true" when: manual - main ```

Configuration Modules and Templates

Security Module Template

modules/security/baseline-security.json:

{
  "name": "Security Baseline Module",
  "description": "Standard security rules for all environments",
  "rules": {
    "block_tor_traffic": {
      "name": "Block Tor Network Traffic",
      "description": "Block requests from Tor exit nodes",
      "wirefilter_rule": "ip.src in $tor_exit_nodes"
    },
    "rate_limit_api": {
      "name": "API Rate Limiting",
      "description": "Rate limit API endpoints",
      "wirefilter_rule": "starts_with(http.request.uri.path, \"/api/\") and rate(1m) > 100"
    },
    "block_malicious_uas": {
      "name": "Block Malicious User Agents", 
      "description": "Block known malicious user agents",
      "wirefilter_rule": "lower(http.user_agent) matches \".*(sqlmap|nmap|masscan|zap).*\""
    }
  },
  "lists": {
    "tor_exit_nodes": {
      "name": "Tor Exit Nodes",
      "type": "ip",
      "ips": ["185.220.100.0/24", "185.220.101.0/24"]
    }
  }
}

Performance Module Template

modules/performance/caching-optimization.json:

{
  "name": "Caching Optimization Module",
  "description": "Performance optimization through intelligent caching",
  "rules": {
    "long_cache_static": {
      "name": "Long Cache for Static Assets",
      "description": "Cache static assets for extended periods",
      "wirefilter_rule": "ends_with(http.request.uri.path, \".css\") or ends_with(http.request.uri.path, \".js\") or ends_with(http.request.uri.path, \".png\")"
    },
    "no_cache_dynamic": {
      "name": "No Cache for Dynamic Content", 
      "description": "Prevent caching of dynamic content",
      "wirefilter_rule": "starts_with(http.request.uri.path, \"/api/\") or contains(http.request.uri.path, \"/admin/\")"
    }
  }
}

Monitoring and Observability

Configuration Drift Detection

View script: scripts/monitor/drift-detection.py

```python

!/usr/bin/env python3

import json import requests import sys from typing import Dict, Any

def detect_configuration_drift(api_token: str, domain: str, expected_config_path: str): """Detect drift between expected and actual configuration"""

# Load expected configuration
with open(expected_config_path, 'r') as f:
    expected_config = json.load(f)

# Get actual configuration from Peakhour
url = f"https://api.peakhour.io/domains/{domain}/edge-access/config/state"
headers = {"Authorization": f"Bearer {api_token}"}

response = requests.get(url, headers=headers)
response.raise_for_status()

actual_config = response.json()['state']

# Compare configurations
drift_detected = False

# Check rules
expected_rules = expected_config.get('rules', {})
actual_rules = actual_config.get('rules', {})

for rule_name, expected_rule in expected_rules.items():
    if rule_name not in actual_rules:
        print(f"❌ Missing rule: {rule_name}")
        drift_detected = True
    elif actual_rules[rule_name] != expected_rule:
        print(f"❌ Rule drift detected: {rule_name}")
        print(f"   Expected: {expected_rule}")
        print(f"   Actual: {actual_rules[rule_name]}")
        drift_detected = True

# Check for unexpected rules
for rule_name in actual_rules:
    if rule_name not in expected_rules:
        print(f"⚠️ Unexpected rule found: {rule_name}")
        drift_detected = True

return not drift_detected

if name == "main": import os

api_token = os.getenv('PEAKHOUR_API_TOKEN')
domain = os.getenv('DOMAIN')
config_path = sys.argv[1] if len(sys.argv) > 1 else 'environments/prod/config.json'

if detect_configuration_drift(api_token, domain, config_path):
    print("✅ No configuration drift detected")
    sys.exit(0)
else:
    print("❌ Configuration drift detected")
    sys.exit(1)
```

Deployment Monitoring

View script: scripts/monitor/deployment-health.py

```python

!/usr/bin/env python3

import requests import time import sys from typing import Dict, Any

def monitor_deployment_health(api_token: str, domain: str, timeout: int = 300): """Monitor deployment health after configuration changes"""

url = f"https://api.peakhour.io/domains/{domain}/analytics/overview"
headers = {"Authorization": f"Bearer {api_token}"}

start_time = time.time()

while time.time() - start_time < timeout:
    try:
        response = requests.get(url, headers=headers)
        response.raise_for_status()

        data = response.json()

        # Check key metrics
        error_rate = data.get('kpis', {}).get('origin_error_rate', 0)
        threats_blocked = data.get('kpis', {}).get('total_threats_blocked', 0)

        print(f"🔍 Health Check - Error Rate: {error_rate}%, Threats Blocked: {threats_blocked}")

        # Alert conditions
        if error_rate > 5.0:  # More than 5% error rate
            print(f"⚠️ High error rate detected: {error_rate}%")
            return False

        # If we've made it 5 minutes without issues, consider it healthy
        if time.time() - start_time > 300:
            print("✅ Deployment appears healthy")
            return True

    except requests.exceptions.RequestException as e:
        print(f"❌ Health check failed: {e}")
        return False

    time.sleep(30)  # Check every 30 seconds

print("⏰ Health check timeout")
return False

if name == "main": import os

api_token = os.getenv('PEAKHOUR_API_TOKEN')
domain = os.getenv('DOMAIN')

if monitor_deployment_health(api_token, domain):
    sys.exit(0)
else:
    sys.exit(1)
```

This Infrastructure as Code approach enables you to manage Peakhour configurations with the same rigor as application code, providing version control, automated testing, rollback capabilities, and comprehensive monitoring for your edge security and performance configurations.