Infrastructure as Code with Peakhour¶
This guide shows you how to manage Peakhour configurations using Infrastructure as Code (IaC) principles, enabling version control, automated deployments, and consistent environment management across your development lifecycle.
Before you begin: Understand API key management and have basic knowledge of Git workflows and CI/CD pipelines.
Understanding Peakhour's Configuration Management¶
Peakhour provides a comprehensive configuration state system that enables Git-like workflow for infrastructure changes with commit/rollback capabilities and version history.
Configuration State Architecture¶
State Management Features:
- Version Control: Track all configuration changes with commit messages
- Rollback Capability: Revert to any previous configuration version
- Diff Visualization: See exactly what changed between versions
- Atomic Operations: All changes applied together or not at all
- History Tracking: Complete audit trail of configuration modifications
Configuration Components:
ConfigStateModel = {
rules: Record<string, RuleAdd>, // Firewall and processing rules
policies: Record<string, PolicyAdd>, // Edge Access policies
applications: Record<string, ApplicationAdd>, // Application definitions
auth_config: AuthServerConfigAdd, // Authentication settings
lists: Record<string, EdgeAccessListAdd>, // IP and string lists
metadata: Record<string, unknown> // Custom metadata
}
Set Up Configuration Repository¶
Initialize Configuration Repository¶
Create a Git repository for your Peakhour configurations:
# Create new repository
mkdir peakhour-config
cd peakhour-config
git init
# Create directory structure
mkdir -p environments/{dev,staging,prod}
mkdir -p modules/{security,access-control,performance}
mkdir -p scripts/{deploy,validate,rollback}
# Initialize configuration files
touch environments/dev/config.json
touch environments/staging/config.json
touch environments/prod/config.json
Configuration File Structure¶
Environment-Specific Configuration (environments/prod/config.json
):
{
"domain": "example.com",
"environment": "production",
"rules": {
"security_baseline": {
"name": "Security Baseline",
"description": "Core security rules for production",
"wirefilter_rule": "ip.geoip.country in {\"CN\", \"RU\"} and not (ip.src in $trusted_partners)"
},
"api_protection": {
"name": "API Rate Protection",
"description": "Protect API endpoints from abuse",
"wirefilter_rule": "starts_with(http.request.uri.path, \"/api/\") and rate(1m) > 100"
}
},
"policies": {
"admin_access": {
"name": "Admin Access Policy",
"description": "Secure admin area access",
"action": "logingate",
"rule_ids": ["admin_ip_whitelist", "business_hours"]
}
},
"lists": {
"trusted_partners": {
"name": "Trusted Partner IPs",
"type": "ip",
"ips": ["203.0.113.0/24", "198.51.100.0/24"]
}
}
}
Module-Based Configuration (modules/security/waf-baseline.json
):
{
"name": "WAF Security Baseline",
"description": "Standard WAF configuration for all environments",
"rules": {
"sql_injection_block": {
"name": "Block SQL Injection",
"description": "Detect and block SQL injection attempts",
"wirefilter_rule": "waf.matched_rule.tags contains \"sql\" and waf.matched_rule.severity eq \"CRITICAL\""
},
"xss_protection": {
"name": "XSS Protection",
"description": "Block cross-site scripting attempts",
"wirefilter_rule": "waf.matched_rule.tags contains \"xss\" and waf.matched_rule.severity in {\"CRITICAL\", \"ERROR\"}"
}
}
}
Environment Variables and Secrets¶
.env.example
:
# API Configuration
PEAKHOUR_API_TOKEN=your_api_token_here
PEAKHOUR_API_BASE_URL=https://api.peakhour.io
# Environment Settings
ENVIRONMENT=production
DOMAIN=example.com
# Deployment Settings
AUTO_COMMIT=true
COMMIT_MESSAGE_PREFIX="[automated]"
ROLLBACK_ON_ERROR=true
Secrets Management (using GitHub Secrets, AWS Parameter Store, etc.):
# Store API tokens securely
# GitHub Actions: Repository Settings → Secrets
PEAKHOUR_API_TOKEN_DEV
PEAKHOUR_API_TOKEN_STAGING
PEAKHOUR_API_TOKEN_PROD
Create Deployment Scripts¶
Configuration Deployment Script¶
import os import json import requests import sys from datetime import datetime from typing import Dict, Any
class PeakhourDeployer: def init(self, api_token: str, domain: str, environment: str): self.api_token = api_token self.domain = domain self.environment = environment self.base_url = "https://api.peakhour.io"
def load_config(self, config_path: str) -> Dict[str, Any]:
"""Load configuration from JSON file"""
with open(config_path, 'r') as f:
return json.load(f)
def merge_modules(self, base_config: Dict[str, Any], module_paths: list) -> Dict[str, Any]:
"""Merge module configurations into base config"""
merged_config = base_config.copy()
for module_path in module_paths:
with open(module_path, 'r') as f:
module_config = json.load(f)
# Merge rules
if 'rules' in module_config:
merged_config.setdefault('rules', {}).update(module_config['rules'])
# Merge other components
for key in ['policies', 'applications', 'lists']:
if key in module_config:
merged_config.setdefault(key, {}).update(module_config[key])
return merged_config
def validate_config(self, config: Dict[str, Any]) -> bool:
"""Validate configuration before deployment"""
required_fields = ['rules', 'domain']
for field in required_fields:
if field not in config:
print(f"❌ Missing required field: {field}")
return False
# Validate rule syntax
for rule_name, rule_config in config.get('rules', {}).items():
if not rule_config.get('wirefilter_rule'):
print(f"❌ Rule '{rule_name}' missing wirefilter_rule")
return False
print("✅ Configuration validation passed")
return True
def get_current_state(self) -> Dict[str, Any]:
"""Get current configuration state from Peakhour"""
url = f"{self.base_url}/domains/{self.domain}/edge-access/config/state"
headers = {"Authorization": f"Bearer {self.api_token}"}
response = requests.get(url, headers=headers)
response.raise_for_status()
return response.json()
def apply_config(self, config: Dict[str, Any], commit_message: str = None) -> Dict[str, Any]:
"""Apply configuration to Peakhour"""
if not commit_message:
commit_message = f"Deploy {self.environment} configuration - {datetime.now().isoformat()}"
url = f"{self.base_url}/domains/{self.domain}/edge-access/config/apply"
headers = {
"Authorization": f"Bearer {self.api_token}",
"Content-Type": "application/json"
}
payload = {
"state": config,
"message": commit_message
}
response = requests.post(url, json=payload, headers=headers)
response.raise_for_status()
return response.json()
def commit_changes(self, message: str) -> Dict[str, Any]:
"""Commit pending configuration changes"""
url = f"{self.base_url}/domains/{self.domain}/edge-access/config/commit"
headers = {
"Authorization": f"Bearer {self.api_token}",
"Content-Type": "application/json"
}
payload = {"message": message}
response = requests.post(url, json=payload, headers=headers)
response.raise_for_status()
return response.json()
def get_version_history(self) -> list:
"""Get configuration version history"""
url = f"{self.base_url}/domains/{self.domain}/edge-access/config/history"
headers = {"Authorization": f"Bearer {self.api_token}"}
response = requests.get(url, headers=headers)
response.raise_for_status()
return response.json()['history']
def rollback_to_version(self, version: int, commit_message: str = None) -> Dict[str, Any]:
"""Rollback to specific configuration version"""
if not commit_message:
commit_message = f"Rollback to version {version} - {datetime.now().isoformat()}"
url = f"{self.base_url}/domains/{self.domain}/edge-access/config/rollback"
headers = {
"Authorization": f"Bearer {self.api_token}",
"Content-Type": "application/json"
}
payload = {
"version": version,
"message": commit_message,
"commit": True
}
response = requests.post(url, json=payload, headers=headers)
response.raise_for_status()
return response.json()
def main(): # Load environment variables api_token = os.getenv('PEAKHOUR_API_TOKEN') domain = os.getenv('DOMAIN') environment = os.getenv('ENVIRONMENT', 'dev')
if not api_token or not domain:
print("❌ Missing required environment variables: PEAKHOUR_API_TOKEN, DOMAIN")
sys.exit(1)
deployer = PeakhourDeployer(api_token, domain, environment)
# Load base configuration
config_path = f"environments/{environment}/config.json"
try:
base_config = deployer.load_config(config_path)
print(f"✅ Loaded base configuration from {config_path}")
except FileNotFoundError:
print(f"❌ Configuration file not found: {config_path}")
sys.exit(1)
# Load and merge modules
module_paths = [
"modules/security/waf-baseline.json",
"modules/performance/caching-rules.json"
]
available_modules = [path for path in module_paths if os.path.exists(path)]
if available_modules:
merged_config = deployer.merge_modules(base_config, available_modules)
print(f"✅ Merged {len(available_modules)} modules")
else:
merged_config = base_config
# Validate configuration
if not deployer.validate_config(merged_config):
print("❌ Configuration validation failed")
sys.exit(1)
# Apply configuration
try:
commit_msg = os.getenv('COMMIT_MESSAGE', f"Deploy {environment} configuration")
result = deployer.apply_config(merged_config, commit_msg)
print(f"✅ Configuration deployed successfully")
print(f" Version: {result.get('version', 'N/A')}")
print(f" Timestamp: {result.get('timestamp', 'N/A')}")
except requests.exceptions.RequestException as e:
print(f"❌ Deployment failed: {e}")
# Rollback on error if enabled
if os.getenv('ROLLBACK_ON_ERROR', 'false').lower() == 'true':
try:
history = deployer.get_version_history()
if len(history) > 1: # More than just current version
previous_version = history[1]['version'] # Second entry is previous
print(f"🔄 Rolling back to version {previous_version}")
rollback_result = deployer.rollback_to_version(
previous_version,
f"Auto-rollback due to deployment error: {str(e)}"
)
print(f"✅ Rollback completed: {rollback_result}")
else:
print("❌ No previous version available for rollback")
except Exception as rollback_error:
print(f"❌ Rollback failed: {rollback_error}")
sys.exit(1)
if __name__ == "__main__":
main()
```
Configuration Validation Script¶
import json import re import sys from typing import Dict, Any, List
class ConfigValidator: def init(self): self.errors = [] self.warnings = []
def validate_wirefilter_syntax(self, rule: str) -> bool:
"""Basic wirefilter syntax validation"""
# Check for common syntax errors
if rule.count('(') != rule.count(')'):
return False
if rule.count('{') != rule.count('}'):
return False
if rule.count('"') % 2 != 0:
return False
# Check for valid operators
valid_operators = [
'eq', '==', 'ne', '!=', 'gt', '>', 'lt', '<', 'ge', '>=', 'le', '<=',
'contains', 'starts_with', 'ends_with', 'matches', 'in', 'not in',
'and', 'or', 'not'
]
return True # More sophisticated validation would check actual syntax
def validate_rule(self, rule_name: str, rule_config: Dict[str, Any]) -> List[str]:
"""Validate individual rule configuration"""
errors = []
# Required fields
if not rule_config.get('name'):
errors.append(f"Rule '{rule_name}' missing 'name' field")
if not rule_config.get('wirefilter_rule'):
errors.append(f"Rule '{rule_name}' missing 'wirefilter_rule' field")
else:
if not self.validate_wirefilter_syntax(rule_config['wirefilter_rule']):
errors.append(f"Rule '{rule_name}' has invalid wirefilter syntax")
# Optional but recommended fields
if not rule_config.get('description'):
self.warnings.append(f"Rule '{rule_name}' missing description")
return errors
def validate_policy(self, policy_name: str, policy_config: Dict[str, Any]) -> List[str]:
"""Validate policy configuration"""
errors = []
required_fields = ['name', 'action']
for field in required_fields:
if not policy_config.get(field):
errors.append(f"Policy '{policy_name}' missing '{field}' field")
# Validate action values
valid_actions = ['allow', 'block', 'bypass', 'serviceauth', 'logingate']
if policy_config.get('action') not in valid_actions:
errors.append(f"Policy '{policy_name}' has invalid action. Valid actions: {valid_actions}")
return errors
def validate_list(self, list_name: str, list_config: Dict[str, Any]) -> List[str]:
"""Validate list configuration"""
errors = []
if not list_config.get('name'):
errors.append(f"List '{list_name}' missing 'name' field")
if not list_config.get('type'):
errors.append(f"List '{list_name}' missing 'type' field")
else:
valid_types = ['ip', 'string', 'int']
if list_config['type'] not in valid_types:
errors.append(f"List '{list_name}' has invalid type. Valid types: {valid_types}")
return errors
def validate_ip_addresses(self, ips: List[str]) -> List[str]:
"""Validate IP addresses and CIDR blocks"""
errors = []
for ip in ips:
# Basic IP/CIDR validation
if not re.match(r'^(\d{1,3}\.){3}\d{1,3}(/\d{1,2})?$', ip):
errors.append(f"Invalid IP address or CIDR block: {ip}")
return errors
def validate_config(self, config: Dict[str, Any]) -> bool:
"""Validate complete configuration"""
self.errors = []
self.warnings = []
# Validate rules
for rule_name, rule_config in config.get('rules', {}).items():
self.errors.extend(self.validate_rule(rule_name, rule_config))
# Validate policies
for policy_name, policy_config in config.get('policies', {}).items():
self.errors.extend(self.validate_policy(policy_name, policy_config))
# Validate lists
for list_name, list_config in config.get('lists', {}).items():
self.errors.extend(self.validate_list(list_name, list_config))
# Validate IP addresses if it's an IP list
if list_config.get('type') == 'ip' and 'ips' in list_config:
self.errors.extend(self.validate_ip_addresses(list_config['ips']))
return len(self.errors) == 0
def print_results(self):
"""Print validation results"""
if self.errors:
print("❌ Validation Errors:")
for error in self.errors:
print(f" • {error}")
if self.warnings:
print("⚠️ Validation Warnings:")
for warning in self.warnings:
print(f" • {warning}")
if not self.errors and not self.warnings:
print("✅ Configuration validation passed with no issues")
elif not self.errors:
print("✅ Configuration validation passed with warnings")
def main():
if len(sys.argv) != 2:
print("Usage: python validate.py
config_file = sys.argv[1]
try:
with open(config_file, 'r') as f:
config = json.load(f)
except FileNotFoundError:
print(f"❌ Configuration file not found: {config_file}")
sys.exit(1)
except json.JSONDecodeError as e:
print(f"❌ Invalid JSON in configuration file: {e}")
sys.exit(1)
validator = ConfigValidator()
is_valid = validator.validate_config(config)
validator.print_results()
if not is_valid:
sys.exit(1)
if __name__ == "__main__":
main()
```
CI/CD Pipeline Integration¶
GitHub Actions Workflow¶
View pipeline: .github/workflows/deploy.yml
```yaml name: Deploy Peakhour Configuration
on: push: branches: [main, staging, dev] pull_request: branches: [main]
env: PEAKHOUR_API_BASE_URL: https://api.peakhour.io
jobs: validate: runs-on: ubuntu-latest steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.9'
- name: Install dependencies
run: |
pip install requests
- name: Validate Configuration
run: |
python scripts/validate/validate.py environments/dev/config.json
python scripts/validate/validate.py environments/staging/config.json
python scripts/validate/validate.py environments/prod/config.json
deploy-dev: needs: validate runs-on: ubuntu-latest if: github.ref == 'refs/heads/dev' environment: development
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.9'
- name: Install dependencies
run: pip install requests
- name: Deploy to Development
env:
PEAKHOUR_API_TOKEN: ${{ secrets.PEAKHOUR_API_TOKEN_DEV }}
DOMAIN: dev.example.com
ENVIRONMENT: dev
COMMIT_MESSAGE: "[dev] ${{ github.event.head_commit.message }}"
run: python scripts/deploy/deploy.py
deploy-staging: needs: validate runs-on: ubuntu-latest if: github.ref == 'refs/heads/staging' environment: staging
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.9'
- name: Install dependencies
run: pip install requests
- name: Deploy to Staging
env:
PEAKHOUR_API_TOKEN: ${{ secrets.PEAKHOUR_API_TOKEN_STAGING }}
DOMAIN: staging.example.com
ENVIRONMENT: staging
COMMIT_MESSAGE: "[staging] ${{ github.event.head_commit.message }}"
ROLLBACK_ON_ERROR: true
run: python scripts/deploy/deploy.py
deploy-production: needs: validate runs-on: ubuntu-latest if: github.ref == 'refs/heads/main' environment: production
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.9'
- name: Install dependencies
run: pip install requests
- name: Deploy to Production
env:
PEAKHOUR_API_TOKEN: ${{ secrets.PEAKHOUR_API_TOKEN_PROD }}
DOMAIN: example.com
ENVIRONMENT: prod
COMMIT_MESSAGE: "[prod] ${{ github.event.head_commit.message }}"
ROLLBACK_ON_ERROR: true
run: python scripts/deploy/deploy.py
- name: Health Check
env:
PEAKHOUR_API_TOKEN: ${{ secrets.PEAKHOUR_API_TOKEN_PROD }}
DOMAIN: example.com
run: |
# Wait for deployment to propagate
sleep 30
# Basic health check
curl -f "https://example.com/health" || exit 1
# Check Peakhour API health
curl -f "https://api.peakhour.io/domains/example.com/health" \
-H "Authorization: Bearer $PEAKHOUR_API_TOKEN" || exit 1
rollback-production: runs-on: ubuntu-latest if: failure() && github.ref == 'refs/heads/main' needs: deploy-production
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.9'
- name: Install dependencies
run: pip install requests
- name: Rollback Production
env:
PEAKHOUR_API_TOKEN: ${{ secrets.PEAKHOUR_API_TOKEN_PROD }}
DOMAIN: example.com
run: |
python -c "
import requests
import os
api_token = os.getenv('PEAKHOUR_API_TOKEN')
domain = os.getenv('DOMAIN')
# Get version history
url = f'https://api.peakhour.io/domains/{domain}/edge-access/config/history'
headers = {'Authorization': f'Bearer {api_token}'}
response = requests.get(url, headers=headers)
history = response.json()['history']
if len(history) > 1:
previous_version = history[1]['version']
# Rollback to previous version
rollback_url = f'https://api.peakhour.io/domains/{domain}/edge-access/config/rollback'
payload = {
'version': previous_version,
'message': 'Emergency rollback due to deployment failure',
'commit': True
}
rollback_response = requests.post(rollback_url, json=payload, headers=headers)
rollback_response.raise_for_status()
print(f'✅ Rolled back to version {previous_version}')
else:
print('❌ No previous version available for rollback')
"
```
GitLab CI/CD Pipeline¶
View pipeline: .gitlab-ci.yml
```yaml stages:
- validate
-
deploy-dev
-
deploy-staging
- deploy-prod
variables: PEAKHOUR_API_BASE_URL: https://api.peakhour.io
validate-config: stage: validate image: python:3.9 before_script:
- pip install requests
script:
- python scripts/validate/validate.py environments/dev/config.json
- python scripts/validate/validate.py environments/staging/config.json
- python scripts/validate/validate.py environments/prod/config.json
only:
- merge_requests
- main
- staging
- dev
deploy-development: stage: deploy-dev image: python:3.9 before_script:
- pip install requests
script:
- python scripts/deploy/deploy.py
environment: name: development url: https://dev.example.com variables: DOMAIN: dev.example.com ENVIRONMENT: dev COMMIT_MESSAGE: "[dev] ${CI_COMMIT_MESSAGE}" only:
- dev
deploy-staging: stage: deploy-staging image: python:3.9 before_script:
- pip install requests
script:
- python scripts/deploy/deploy.py
environment: name: staging url: https://staging.example.com variables: DOMAIN: staging.example.com ENVIRONMENT: staging COMMIT_MESSAGE: "[staging] ${CI_COMMIT_MESSAGE}" ROLLBACK_ON_ERROR: "true" only:
- staging
deploy-production: stage: deploy-prod image: python:3.9 before_script:
- pip install requests
script:
- python scripts/deploy/deploy.py
environment: name: production url: https://example.com variables: DOMAIN: example.com ENVIRONMENT: prod COMMIT_MESSAGE: "[prod] ${CI_COMMIT_MESSAGE}" ROLLBACK_ON_ERROR: "true" when: manual - main ```
Configuration Modules and Templates¶
Security Module Template¶
modules/security/baseline-security.json
:
{
"name": "Security Baseline Module",
"description": "Standard security rules for all environments",
"rules": {
"block_tor_traffic": {
"name": "Block Tor Network Traffic",
"description": "Block requests from Tor exit nodes",
"wirefilter_rule": "ip.src in $tor_exit_nodes"
},
"rate_limit_api": {
"name": "API Rate Limiting",
"description": "Rate limit API endpoints",
"wirefilter_rule": "starts_with(http.request.uri.path, \"/api/\") and rate(1m) > 100"
},
"block_malicious_uas": {
"name": "Block Malicious User Agents",
"description": "Block known malicious user agents",
"wirefilter_rule": "lower(http.user_agent) matches \".*(sqlmap|nmap|masscan|zap).*\""
}
},
"lists": {
"tor_exit_nodes": {
"name": "Tor Exit Nodes",
"type": "ip",
"ips": ["185.220.100.0/24", "185.220.101.0/24"]
}
}
}
Performance Module Template¶
modules/performance/caching-optimization.json
:
{
"name": "Caching Optimization Module",
"description": "Performance optimization through intelligent caching",
"rules": {
"long_cache_static": {
"name": "Long Cache for Static Assets",
"description": "Cache static assets for extended periods",
"wirefilter_rule": "ends_with(http.request.uri.path, \".css\") or ends_with(http.request.uri.path, \".js\") or ends_with(http.request.uri.path, \".png\")"
},
"no_cache_dynamic": {
"name": "No Cache for Dynamic Content",
"description": "Prevent caching of dynamic content",
"wirefilter_rule": "starts_with(http.request.uri.path, \"/api/\") or contains(http.request.uri.path, \"/admin/\")"
}
}
}
Monitoring and Observability¶
Configuration Drift Detection¶
import json import requests import sys from typing import Dict, Any
def detect_configuration_drift(api_token: str, domain: str, expected_config_path: str): """Detect drift between expected and actual configuration"""
# Load expected configuration
with open(expected_config_path, 'r') as f:
expected_config = json.load(f)
# Get actual configuration from Peakhour
url = f"https://api.peakhour.io/domains/{domain}/edge-access/config/state"
headers = {"Authorization": f"Bearer {api_token}"}
response = requests.get(url, headers=headers)
response.raise_for_status()
actual_config = response.json()['state']
# Compare configurations
drift_detected = False
# Check rules
expected_rules = expected_config.get('rules', {})
actual_rules = actual_config.get('rules', {})
for rule_name, expected_rule in expected_rules.items():
if rule_name not in actual_rules:
print(f"❌ Missing rule: {rule_name}")
drift_detected = True
elif actual_rules[rule_name] != expected_rule:
print(f"❌ Rule drift detected: {rule_name}")
print(f" Expected: {expected_rule}")
print(f" Actual: {actual_rules[rule_name]}")
drift_detected = True
# Check for unexpected rules
for rule_name in actual_rules:
if rule_name not in expected_rules:
print(f"⚠️ Unexpected rule found: {rule_name}")
drift_detected = True
return not drift_detected
if name == "main": import os
api_token = os.getenv('PEAKHOUR_API_TOKEN')
domain = os.getenv('DOMAIN')
config_path = sys.argv[1] if len(sys.argv) > 1 else 'environments/prod/config.json'
if detect_configuration_drift(api_token, domain, config_path):
print("✅ No configuration drift detected")
sys.exit(0)
else:
print("❌ Configuration drift detected")
sys.exit(1)
```
Deployment Monitoring¶
import requests import time import sys from typing import Dict, Any
def monitor_deployment_health(api_token: str, domain: str, timeout: int = 300): """Monitor deployment health after configuration changes"""
url = f"https://api.peakhour.io/domains/{domain}/analytics/overview"
headers = {"Authorization": f"Bearer {api_token}"}
start_time = time.time()
while time.time() - start_time < timeout:
try:
response = requests.get(url, headers=headers)
response.raise_for_status()
data = response.json()
# Check key metrics
error_rate = data.get('kpis', {}).get('origin_error_rate', 0)
threats_blocked = data.get('kpis', {}).get('total_threats_blocked', 0)
print(f"🔍 Health Check - Error Rate: {error_rate}%, Threats Blocked: {threats_blocked}")
# Alert conditions
if error_rate > 5.0: # More than 5% error rate
print(f"⚠️ High error rate detected: {error_rate}%")
return False
# If we've made it 5 minutes without issues, consider it healthy
if time.time() - start_time > 300:
print("✅ Deployment appears healthy")
return True
except requests.exceptions.RequestException as e:
print(f"❌ Health check failed: {e}")
return False
time.sleep(30) # Check every 30 seconds
print("⏰ Health check timeout")
return False
if name == "main": import os
api_token = os.getenv('PEAKHOUR_API_TOKEN')
domain = os.getenv('DOMAIN')
if monitor_deployment_health(api_token, domain):
sys.exit(0)
else:
sys.exit(1)
```
This Infrastructure as Code approach enables you to manage Peakhour configurations with the same rigor as application code, providing version control, automated testing, rollback capabilities, and comprehensive monitoring for your edge security and performance configurations.