DevSecOps Security Automation with Peakhour¶
This guide demonstrates how to integrate Peakhour's security capabilities into your DevSecOps pipeline, enabling automated threat detection, security policy enforcement, and incident response throughout your development lifecycle.
Before you begin: Understand Infrastructure as Code, security investigation workflows, and have familiarity with security automation concepts.
DevSecOps Integration Architecture¶
Peakhour provides comprehensive APIs and automation capabilities that integrate seamlessly with modern DevSecOps toolchains to create a security-first development lifecycle.
Security Automation Components¶
Pipeline Integration Points:
- Pre-deployment: Security policy validation and threat modeling
- Deployment: Automated security rule deployment and configuration
- Post-deployment: Continuous security monitoring and incident response
- Feedback Loop: Security insights fed back into development process
Automated Security Capabilities:
- Threat Detection: Real-time WAF and firewall event processing
- Policy Enforcement: Automated security rule deployment
- Incident Response: Automated threat mitigation and alerting
- Compliance Monitoring: Continuous security posture assessment
Security Policy as Code¶
Security Configuration Templates¶
Create reusable security policy templates that enforce organizational security standards:
security-policies/templates/owasp-top10.json
:
{
"name": "OWASP Top 10 Protection",
"description": "Comprehensive protection against OWASP Top 10 vulnerabilities",
"rules": {
"sql_injection_protection": {
"name": "SQL Injection Protection",
"description": "Block SQL injection attempts",
"wirefilter_rule": "waf.matched_rule.tags contains \"sql\" and waf.matched_rule.severity in {\"CRITICAL\", \"ERROR\"}"
},
"xss_protection": {
"name": "Cross-Site Scripting Protection",
"description": "Block XSS attempts",
"wirefilter_rule": "waf.matched_rule.tags contains \"xss\" and waf.matched_rule.severity in {\"CRITICAL\", \"ERROR\"}"
},
"rce_protection": {
"name": "Remote Code Execution Protection",
"description": "Block RCE attempts",
"wirefilter_rule": "waf.matched_rule.tags contains \"rce\" and waf.matched_rule.severity eq \"CRITICAL\""
},
"lfi_protection": {
"name": "Local File Inclusion Protection",
"description": "Block LFI attempts",
"wirefilter_rule": "waf.matched_rule.tags contains \"lfi\" and waf.matched_rule.severity in {\"CRITICAL\", \"ERROR\"}"
}
}
}
security-policies/templates/api-security.json
:
{
"name": "API Security Baseline",
"description": "Comprehensive API protection and rate limiting",
"rules": {
"api_rate_limiting": {
"name": "API Rate Limiting",
"description": "Rate limit API endpoints to prevent abuse",
"wirefilter_rule": "starts_with(http.request.uri.path, \"/api/\") and rate(1m) > 100"
},
"api_authentication_bypass": {
"name": "API Authentication Bypass Detection",
"description": "Detect attempts to bypass API authentication",
"wirefilter_rule": "starts_with(http.request.uri.path, \"/api/\") and not http.request.headers[\"authorization\"] exists"
},
"api_method_restriction": {
"name": "API Method Restriction",
"description": "Restrict dangerous HTTP methods on API endpoints",
"wirefilter_rule": "starts_with(http.request.uri.path, \"/api/\") and http.request.method in {\"TRACE\", \"OPTIONS\", \"DELETE\"}"
}
},
"lists": {
"api_abuse_ips": {
"name": "API Abuse IP Addresses",
"type": "ip",
"ips": []
}
}
}
Environment-Specific Security Policies¶
security-policies/environments/production-security.json
:
{
"name": "Production Security Policy",
"description": "High-security policy for production environments",
"rules": {
"block_high_risk_countries": {
"name": "Block High-Risk Countries",
"description": "Block traffic from high-risk geographic regions",
"wirefilter_rule": "ip.geoip.country in {\"CN\", \"RU\", \"KP\", \"IR\"} and not (ip.src in $trusted_partners)"
},
"admin_access_restriction": {
"name": "Admin Access Restriction",
"description": "Restrict admin access to authorized IPs only",
"wirefilter_rule": "starts_with(http.request.uri.path, \"/admin\") and not (ip.src in $admin_ips)"
},
"threat_intelligence_blocking": {
"name": "Threat Intelligence Blocking",
"description": "Block IPs from threat intelligence feeds",
"wirefilter_rule": "ip.src in $malicious_ips or ip.src in $webattacks"
}
},
"lists": {
"trusted_partners": {
"name": "Trusted Partner IPs",
"type": "ip",
"ips": ["203.0.113.0/24", "198.51.100.0/24"]
},
"admin_ips": {
"name": "Administrator IP Whitelist",
"type": "ip",
"ips": ["192.168.1.0/24", "10.0.0.0/16"]
}
}
}
Automated Threat Detection and Response¶
Security Event Processing Pipeline¶
import json import requests import time from datetime import datetime, timedelta from typing import Dict, List, Any import hashlib
class ThreatProcessor: def init(self, api_token: str, domain: str): self.api_token = api_token self.domain = domain self.base_url = "https://api.peakhour.io"
def get_security_events(self, time_range: str = "1h") -> List[Dict[str, Any]]:
"""Retrieve security events from Peakhour"""
url = f"{self.base_url}/domains/{self.domain}/firewall/events/list"
headers = {"Authorization": f"Bearer {self.api_token}"}
# Calculate time range
end_time = datetime.now()
if time_range == "1h":
start_time = end_time - timedelta(hours=1)
elif time_range == "24h":
start_time = end_time - timedelta(days=1)
else:
start_time = end_time - timedelta(hours=1)
params = {
"from_date": start_time.isoformat(),
"to_date": end_time.isoformat(),
"event_type": "all"
}
response = requests.get(url, headers=headers, params=params)
response.raise_for_status()
return response.json()
def analyze_attack_patterns(self, events: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Analyze attack patterns from security events"""
# Group events by IP and attack type
ip_attacks = {}
attack_types = {}
geographic_distribution = {}
for event in events:
ip = event.get('client', 'unknown')
attack_type = event.get('by', 'unknown')
country = event.get('geoip_country_code', 'unknown')
severity = event.get('waf', {}).get('matched_rule', {}).get('severity', 'unknown')
# Count attacks by IP
if ip not in ip_attacks:
ip_attacks[ip] = {
'count': 0,
'attack_types': set(),
'countries': set(),
'severities': set()
}
ip_attacks[ip]['count'] += 1
ip_attacks[ip]['attack_types'].add(attack_type)
ip_attacks[ip]['countries'].add(country)
ip_attacks[ip]['severities'].add(severity)
# Count attack types
attack_types[attack_type] = attack_types.get(attack_type, 0) + 1
# Geographic distribution
geographic_distribution[country] = geographic_distribution.get(country, 0) + 1
# Identify top attackers
top_attackers = sorted(
ip_attacks.items(),
key=lambda x: x[1]['count'],
reverse=True
)[:10]
# Identify coordinated attacks (same attack type from multiple IPs)
coordinated_attacks = {}
for ip, data in ip_attacks.items():
for attack_type in data['attack_types']:
if attack_type not in coordinated_attacks:
coordinated_attacks[attack_type] = []
coordinated_attacks[attack_type].append(ip)
# Filter for coordinated attacks (3+ IPs with same attack type)
coordinated_attacks = {
k: v for k, v in coordinated_attacks.items()
if len(v) >= 3
}
return {
'total_events': len(events),
'unique_attackers': len(ip_attacks),
'top_attackers': top_attackers,
'attack_type_distribution': attack_types,
'geographic_distribution': geographic_distribution,
'coordinated_attacks': coordinated_attacks,
'analysis_timestamp': datetime.now().isoformat()
}
def generate_threat_intelligence(self, analysis: Dict[str, Any]) -> List[str]:
"""Generate actionable threat intelligence from analysis"""
threats = []
# High-frequency attackers
for ip, data in analysis['top_attackers'][:5]:
if data['count'] > 10: # More than 10 attacks
threats.append({
'type': 'high_frequency_attacker',
'ip': ip,
'attack_count': data['count'],
'attack_types': list(data['attack_types']),
'recommendation': 'Add to IP blocklist'
})
# Coordinated attack campaigns
for attack_type, ips in analysis['coordinated_attacks'].items():
if len(ips) >= 5: # 5+ IPs with same attack type
threats.append({
'type': 'coordinated_campaign',
'attack_type': attack_type,
'participating_ips': ips,
'ip_count': len(ips),
'recommendation': 'Implement geographic or ASN-based blocking'
})
# Geographic anomalies
total_events = analysis['total_events']
for country, count in analysis['geographic_distribution'].items():
if count > total_events * 0.3: # More than 30% from one country
threats.append({
'type': 'geographic_anomaly',
'country': country,
'event_count': count,
'percentage': (count / total_events) * 100,
'recommendation': 'Consider country-specific rate limiting'
})
return threats
def auto_mitigate_threats(self, threats: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Automatically mitigate identified threats"""
mitigations = []
for threat in threats:
if threat['type'] == 'high_frequency_attacker':
# Add to blocklist
ip = threat['ip']
if self.add_ip_to_blocklist(ip, f"Auto-blocked: {threat['attack_count']} attacks"):
mitigations.append({
'type': 'ip_block',
'target': ip,
'status': 'success',
'timestamp': datetime.now().isoformat()
})
elif threat['type'] == 'coordinated_campaign':
# Implement rate limiting for attack type
attack_type = threat['attack_type']
if self.implement_attack_type_rate_limit(attack_type):
mitigations.append({
'type': 'rate_limit',
'target': attack_type,
'status': 'success',
'timestamp': datetime.now().isoformat()
})
return mitigations
def add_ip_to_blocklist(self, ip: str, reason: str) -> bool:
"""Add IP to automatic blocklist"""
try:
# Get or create auto-blocklist
list_name = "auto_threat_blocking"
list_data = self.get_or_create_list(list_name, "ip", "Automatically blocked threat IPs")
# Add IP to list
url = f"{self.base_url}/domains/{self.domain}/edge-access/lists/{list_data['uuid']}"
headers = {
"Authorization": f"Bearer {self.api_token}",
"Content-Type": "application/json"
}
# Get current IPs and add new one
current_ips = list_data.get('ips', [])
if ip not in current_ips:
current_ips.append(ip)
payload = {"ips": current_ips}
response = requests.patch(url, json=payload, headers=headers)
response.raise_for_status()
print(f"โ
Added {ip} to auto-blocklist: {reason}")
return True
return True # Already in list
except Exception as e:
print(f"โ Failed to add {ip} to blocklist: {e}")
return False
def get_or_create_list(self, name: str, list_type: str, description: str) -> Dict[str, Any]:
"""Get existing list or create new one"""
# Try to get existing list
url = f"{self.base_url}/domains/{self.domain}/edge-access/lists"
headers = {"Authorization": f"Bearer {self.api_token}"}
response = requests.get(url, headers=headers)
response.raise_for_status()
lists = response.json()
for lst in lists:
if lst['name'] == name:
return lst
# Create new list
payload = {
"name": name,
"type": list_type,
"description": description
}
if list_type == "ip":
payload["ips"] = []
response = requests.post(url, json=payload, headers=headers)
response.raise_for_status()
return response.json()
def implement_attack_type_rate_limit(self, attack_type: str) -> bool:
"""Implement rate limiting for specific attack type"""
try:
# This would create a rule to rate limit specific attack patterns
# Implementation depends on your specific rule management system
print(f"๐ก๏ธ Implemented rate limiting for {attack_type} attacks")
return True
except Exception as e:
print(f"โ Failed to implement rate limiting for {attack_type}: {e}")
return False
def main(): import os import sys
api_token = os.getenv('PEAKHOUR_API_TOKEN')
domain = os.getenv('DOMAIN')
if not api_token or not domain:
print("โ Missing required environment variables")
sys.exit(1)
processor = ThreatProcessor(api_token, domain)
print("๐ Retrieving security events...")
events = processor.get_security_events("1h")
if not events:
print("โ
No security events in the last hour")
return
print(f"๐ Analyzing {len(events)} security events...")
analysis = processor.analyze_attack_patterns(events)
print(f"๐ Found {analysis['unique_attackers']} unique attackers")
print(f"๐ Attack type distribution: {analysis['attack_type_distribution']}")
# Generate threat intelligence
threats = processor.generate_threat_intelligence(analysis)
if threats:
print(f"โ ๏ธ Identified {len(threats)} threats:")
for threat in threats:
print(f" โข {threat['type']}: {threat.get('recommendation', 'No recommendation')}")
# Auto-mitigate critical threats
print("๐ก๏ธ Auto-mitigating threats...")
mitigations = processor.auto_mitigate_threats(threats)
if mitigations:
print(f"โ
Applied {len(mitigations)} mitigations")
for mitigation in mitigations:
print(f" โข {mitigation['type']}: {mitigation['target']}")
else:
print("โ
No actionable threats identified")
if __name__ == "__main__":
main()
```
Automated Incident Response¶
import json import requests import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from datetime import datetime from typing import Dict, List, Any
class IncidentResponse: def init(self, api_token: str, domain: str, config: Dict[str, Any]): self.api_token = api_token self.domain = domain self.config = config self.base_url = "https://api.peakhour.io"
def detect_security_incidents(self) -> List[Dict[str, Any]]:
"""Detect security incidents based on configured thresholds"""
incidents = []
# Get recent security events
url = f"{self.base_url}/domains/{self.domain}/firewall/events/analytics"
headers = {"Authorization": f"Bearer {self.api_token}"}
response = requests.get(url, headers=headers)
response.raise_for_status()
analytics = response.json()
# Check for incident conditions
total_events = analytics.get('summary', {}).get('total_hits', 0)
blocked_events = analytics.get('summary', {}).get('total_blocked', 0)
# High attack volume incident
if total_events > self.config.get('high_volume_threshold', 1000):
incidents.append({
'type': 'high_attack_volume',
'severity': 'high',
'metrics': {
'total_events': total_events,
'blocked_events': blocked_events
},
'description': f'High attack volume detected: {total_events} events in last hour'
})
# Low block rate incident (possible bypass)
if total_events > 100: # Only if we have significant traffic
block_rate = (blocked_events / total_events) * 100 if total_events > 0 else 0
if block_rate < self.config.get('min_block_rate', 80):
incidents.append({
'type': 'low_block_rate',
'severity': 'medium',
'metrics': {
'block_rate': block_rate,
'total_events': total_events
},
'description': f'Low block rate detected: {block_rate:.1f}% (threshold: {self.config.get("min_block_rate", 80)}%)'
})
return incidents
def execute_incident_response(self, incident: Dict[str, Any]) -> Dict[str, Any]:
"""Execute automated response for specific incident types"""
response_actions = []
if incident['type'] == 'high_attack_volume':
# Implement emergency rate limiting
if self.enable_emergency_rate_limiting():
response_actions.append('emergency_rate_limiting_enabled')
# Enable additional WAF rules
if self.enable_enhanced_waf():
response_actions.append('enhanced_waf_enabled')
elif incident['type'] == 'low_block_rate':
# Review and tighten security rules
if self.review_security_rules():
response_actions.append('security_rules_reviewed')
# Always send notifications for incidents
if self.send_incident_notification(incident):
response_actions.append('notification_sent')
return {
'incident_id': self.generate_incident_id(incident),
'timestamp': datetime.now().isoformat(),
'actions_taken': response_actions,
'status': 'responded'
}
def enable_emergency_rate_limiting(self) -> bool:
"""Enable emergency rate limiting rules"""
try:
# This would implement emergency rate limiting
# by creating or enabling stricter rate limit rules
print("๐จ Emergency rate limiting enabled")
return True
except Exception as e:
print(f"โ Failed to enable emergency rate limiting: {e}")
return False
def enable_enhanced_waf(self) -> bool:
"""Enable enhanced WAF protection"""
try:
# This would enable additional WAF rules or
# increase WAF sensitivity during incidents
print("๐ก๏ธ Enhanced WAF protection enabled")
return True
except Exception as e:
print(f"โ Failed to enable enhanced WAF: {e}")
return False
def send_incident_notification(self, incident: Dict[str, Any]) -> bool:
"""Send incident notification to security team"""
try:
# Create incident report
subject = f"๐จ Security Incident Alert - {incident['type']} - {self.domain}"
body = f"""
Security Incident Detected
Domain: {self.domain} Incident Type: {incident['type']} Severity: {incident['severity']} Time: {datetime.now().isoformat()}
Description: {incident['description']}
Metrics: {json.dumps(incident.get('metrics', {}), indent=2)}
Automated response has been initiated. Please review the incident and take additional action if necessary.
Peakhour Security Automation """
# Send email notification
msg = MIMEText(body)
msg['Subject'] = subject
msg['From'] = self.config.get('from_email', 'security@peakhour.io')
msg['To'] = ', '.join(self.config.get('security_emails', []))
# SMTP configuration from environment/config
smtp_config = self.config.get('smtp', {})
if smtp_config:
with smtplib.SMTP(smtp_config['host'], smtp_config['port']) as server:
if smtp_config.get('tls'):
server.starttls()
if smtp_config.get('username'):
server.login(smtp_config['username'], smtp_config['password'])
server.send_message(msg)
# Also send to webhook if configured
webhook_url = self.config.get('webhook_url')
if webhook_url:
webhook_payload = {
'incident': incident,
'domain': self.domain,
'timestamp': datetime.now().isoformat()
}
requests.post(webhook_url, json=webhook_payload)
return True
except Exception as e:
print(f"โ Failed to send incident notification: {e}")
return False
def generate_incident_id(self, incident: Dict[str, Any]) -> str:
"""Generate unique incident ID"""
import hashlib
content = f"{self.domain}_{incident['type']}_{datetime.now().date().isoformat()}"
return hashlib.md5(content.encode()).hexdigest()[:8]
def review_security_rules(self) -> bool:
"""Review and potentially tighten security rules"""
try:
# This would analyze current rules and suggest/implement improvements
print("๐ Security rules reviewed and optimized")
return True
except Exception as e:
print(f"โ Failed to review security rules: {e}")
return False
def main(): import os import sys
# Configuration
config = {
'high_volume_threshold': int(os.getenv('HIGH_VOLUME_THRESHOLD', '1000')),
'min_block_rate': float(os.getenv('MIN_BLOCK_RATE', '80.0')),
'security_emails': os.getenv('SECURITY_EMAILS', '').split(',') if os.getenv('SECURITY_EMAILS') else [],
'webhook_url': os.getenv('SECURITY_WEBHOOK_URL'),
'smtp': {
'host': os.getenv('SMTP_HOST'),
'port': int(os.getenv('SMTP_PORT', '587')),
'tls': os.getenv('SMTP_TLS', 'true').lower() == 'true',
'username': os.getenv('SMTP_USERNAME'),
'password': os.getenv('SMTP_PASSWORD')
} if os.getenv('SMTP_HOST') else None
}
api_token = os.getenv('PEAKHOUR_API_TOKEN')
domain = os.getenv('DOMAIN')
if not api_token or not domain:
print("โ Missing required environment variables")
sys.exit(1)
incident_responder = IncidentResponse(api_token, domain, config)
print("๐ Detecting security incidents...")
incidents = incident_responder.detect_security_incidents()
if incidents:
print(f"๐จ Detected {len(incidents)} security incidents")
for incident in incidents:
print(f" โข {incident['type']}: {incident['severity']} severity")
# Execute incident response
response = incident_responder.execute_incident_response(incident)
print(f" โข Response: {', '.join(response['actions_taken'])}")
else:
print("โ
No security incidents detected")
if __name__ == "__main__":
main()
```
CI/CD Security Integration¶
Security-First Pipeline¶
View pipeline: .github/workflows/devsecops.yml
```yaml name: DevSecOps Security Pipeline
on: push: branches: [main, staging, dev] pull_request: branches: [main]
env: PEAKHOUR_API_BASE_URL: https://api.peakhour.io
jobs: security-validation: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.9'
- name: Install security tools
run: |
pip install requests bandit safety semgrep
- name: Security Policy Validation
run: |
# Validate security configurations
python scripts/validate/security-policy-validator.py
- name: Code Security Scan
run: |
# Scan for security vulnerabilities in deployment scripts
bandit -r scripts/ -f json -o bandit-report.json || true
# Check for known vulnerabilities in dependencies
safety check --json --output safety-report.json || true
# Static analysis for security issues
semgrep --config=auto scripts/ --json --output=semgrep-report.json || true
- name: Upload Security Reports
uses: actions/upload-artifact@v3
with:
name: security-reports
path: |
bandit-report.json
safety-report.json
semgrep-report.json
threat-modeling: runs-on: ubuntu-latest if: github.event_name == 'pull_request' steps: - uses: actions/checkout@v3
- name: Automated Threat Modeling
run: |
# Analyze configuration changes for security implications
python scripts/security/threat-modeling.py \
--changed-files="$(git diff --name-only origin/main...HEAD)" \
--output="threat-model-report.json"
- name: Security Review Comment
uses: actions/github-script@v6
with:
script: |
const fs = require('fs');
const report = JSON.parse(fs.readFileSync('threat-model-report.json'));
const comment = `## ๐ Security Analysis
**Threat Assessment**: ${report.risk_level}
**Identified Risks**:
${report.risks.map(r => `โข ${r.description} (${r.severity})`).join('\n')}
**Recommendations**:
${report.recommendations.map(r => `โข ${r}`).join('\n')}
`;
github.rest.issues.createComment({
issue_number: context.issue.number,
owner: context.repo.owner,
repo: context.repo.repo,
body: comment
});
security-testing: needs: security-validation runs-on: ubuntu-latest steps: - uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.9'
- name: Install dependencies
run: pip install requests
- name: Security Configuration Test
env:
PEAKHOUR_API_TOKEN: ${{ secrets.PEAKHOUR_API_TOKEN_TEST }}
DOMAIN: test.example.com
run: |
# Test security rules in isolated environment
python scripts/testing/security-rule-tester.py
- name: Penetration Testing
run: |
# Automated penetration testing against test environment
python scripts/testing/automated-pentest.py \
--target="https://test.example.com" \
--report="pentest-report.json"
deploy-security: needs: [security-validation, security-testing] runs-on: ubuntu-latest if: github.ref == 'refs/heads/main' environment: production
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.9'
- name: Install dependencies
run: pip install requests
- name: Deploy Security Configuration
env:
PEAKHOUR_API_TOKEN: ${{ secrets.PEAKHOUR_API_TOKEN_PROD }}
DOMAIN: example.com
ENVIRONMENT: prod
run: |
# Deploy with security-first approach
python scripts/deploy/secure-deploy.py
- name: Post-Deployment Security Verification
env:
PEAKHOUR_API_TOKEN: ${{ secrets.PEAKHOUR_API_TOKEN_PROD }}
DOMAIN: example.com
run: |
# Verify security posture after deployment
python scripts/security/security-verification.py
continuous-monitoring: runs-on: ubuntu-latest if: github.ref == 'refs/heads/main' steps: - uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.9'
- name: Install dependencies
run: pip install requests
- name: Enable Continuous Security Monitoring
env:
PEAKHOUR_API_TOKEN: ${{ secrets.PEAKHOUR_API_TOKEN_PROD }}
DOMAIN: example.com
run: |
# Set up continuous monitoring and alerting
python scripts/security/setup-monitoring.py
```
Automated Security Testing¶
import requests import json import time from typing import Dict, List, Any
class SecurityRuleTester: def init(self, api_token: str, domain: str): self.api_token = api_token self.domain = domain self.base_url = "https://api.peakhour.io"
def test_sql_injection_protection(self) -> Dict[str, Any]:
"""Test SQL injection protection rules"""
test_payloads = [
"' OR '1'='1",
"1' UNION SELECT * FROM users--",
"'; DROP TABLE users; --",
"admin'/**/OR/**/1=1/**/--"
]
results = []
for payload in test_payloads:
test_url = f"https://{self.domain}/test?id={payload}"
try:
response = requests.get(test_url, timeout=10)
# Should be blocked (403, 406, etc.) not 200
if response.status_code == 200:
results.append({
'payload': payload,
'status': 'failed',
'reason': f'SQL injection not blocked (HTTP {response.status_code})'
})
else:
results.append({
'payload': payload,
'status': 'passed',
'reason': f'SQL injection blocked (HTTP {response.status_code})'
})
except requests.exceptions.RequestException as e:
results.append({
'payload': payload,
'status': 'error',
'reason': str(e)
})
passed = sum(1 for r in results if r['status'] == 'passed')
return {
'test_type': 'sql_injection_protection',
'total_tests': len(results),
'passed': passed,
'success_rate': (passed / len(results)) * 100,
'details': results
}
def test_xss_protection(self) -> Dict[str, Any]:
"""Test XSS protection rules"""
test_payloads = [
"<script>alert('xss')</script>",
"javascript:alert('xss')",
"<img src=x onerror=alert('xss')>",
"'><script>alert(String.fromCharCode(88,83,83))</script>"
]
results = []
for payload in test_payloads:
test_url = f"https://{self.domain}/search?q={payload}"
try:
response = requests.get(test_url, timeout=10)
if response.status_code == 200:
# Check if payload is reflected without encoding
if payload in response.text:
results.append({
'payload': payload,
'status': 'failed',
'reason': 'XSS payload reflected without protection'
})
else:
results.append({
'payload': payload,
'status': 'passed',
'reason': 'XSS payload sanitized or blocked'
})
else:
results.append({
'payload': payload,
'status': 'passed',
'reason': f'XSS blocked (HTTP {response.status_code})'
})
except requests.exceptions.RequestException as e:
results.append({
'payload': payload,
'status': 'error',
'reason': str(e)
})
passed = sum(1 for r in results if r['status'] == 'passed')
return {
'test_type': 'xss_protection',
'total_tests': len(results),
'passed': passed,
'success_rate': (passed / len(results)) * 100,
'details': results
}
def test_rate_limiting(self) -> Dict[str, Any]:
"""Test rate limiting configuration"""
test_endpoint = f"https://{self.domain}/api/test"
# Make rapid requests to trigger rate limiting
responses = []
for i in range(50): # 50 rapid requests
try:
response = requests.get(test_endpoint, timeout=5)
responses.append(response.status_code)
except requests.exceptions.RequestException:
responses.append(0) # Connection error
time.sleep(0.1) # Brief delay between requests
# Analyze responses
blocked_responses = sum(1 for r in responses if r in [429, 403, 406])
successful_responses = sum(1 for r in responses if r == 200)
return {
'test_type': 'rate_limiting',
'total_requests': len(responses),
'successful_requests': successful_responses,
'blocked_requests': blocked_responses,
'block_rate': (blocked_responses / len(responses)) * 100,
'rate_limiting_effective': blocked_responses > 0
}
def run_all_security_tests(self) -> Dict[str, Any]:
"""Run all security tests and compile results"""
print("๐งช Running security rule tests...")
tests = [
self.test_sql_injection_protection(),
self.test_xss_protection(),
self.test_rate_limiting()
]
# Calculate overall results
total_tests = sum(test.get('total_tests', test.get('total_requests', 0)) for test in tests)
total_passed = sum(test.get('passed', 0) for test in tests)
# Add rate limiting success
if tests[2]['rate_limiting_effective']:
total_passed += 1
total_tests += 1
return {
'timestamp': time.time(),
'domain': self.domain,
'overall_success_rate': (total_passed / total_tests) * 100 if total_tests > 0 else 0,
'test_results': tests,
'security_posture': 'strong' if (total_passed / total_tests) > 0.8 else 'needs_improvement'
}
def main(): import os import sys
api_token = os.getenv('PEAKHOUR_API_TOKEN')
domain = os.getenv('DOMAIN')
if not api_token or not domain:
print("โ Missing required environment variables")
sys.exit(1)
tester = SecurityRuleTester(api_token, domain)
results = tester.run_all_security_tests()
print(f"๐ Security Test Results for {domain}")
print(f"Overall Success Rate: {results['overall_success_rate']:.1f}%")
print(f"Security Posture: {results['security_posture']}")
for test in results['test_results']:
test_type = test.get('test_type', 'unknown')
if 'success_rate' in test:
print(f" โข {test_type}: {test['success_rate']:.1f}% ({test['passed']}/{test['total_tests']})")
elif 'rate_limiting_effective' in test:
print(f" โข {test_type}: {'โ
Effective' if test['rate_limiting_effective'] else 'โ Not Effective'}")
# Exit with error code if security posture is poor
if results['security_posture'] == 'needs_improvement':
print("โ Security tests indicate improvements needed")
sys.exit(1)
else:
print("โ
Security tests passed")
if __name__ == "__main__":
main()
```
Compliance and Reporting¶
Automated Compliance Monitoring¶
import json import requests from datetime import datetime, timedelta from typing import Dict, List, Any
class ComplianceMonitor: def init(self, api_token: str, domain: str): self.api_token = api_token self.domain = domain self.base_url = "https://api.peakhour.io"
def check_owasp_compliance(self) -> Dict[str, Any]:
"""Check compliance with OWASP security standards"""
compliance_checks = {
'sql_injection_protection': False,
'xss_protection': False,
'csrf_protection': False,
'security_headers': False,
'rate_limiting': False,
'authentication_protection': False
}
# Get current security configuration
rules_url = f"{self.base_url}/domains/{self.domain}/edge-access/rules"
headers = {"Authorization": f"Bearer {self.api_token}"}
response = requests.get(rules_url, headers=headers)
response.raise_for_status()
rules = response.json()
# Check for SQL injection protection
for rule in rules:
wirefilter_rule = rule.get('wirefilter_rule', '').lower()
if 'sql' in wirefilter_rule or 'injection' in wirefilter_rule:
compliance_checks['sql_injection_protection'] = True
if 'xss' in wirefilter_rule:
compliance_checks['xss_protection'] = True
if 'rate(' in wirefilter_rule:
compliance_checks['rate_limiting'] = True
if 'auth' in wirefilter_rule or 'login' in wirefilter_rule:
compliance_checks['authentication_protection'] = True
# Calculate compliance score
total_checks = len(compliance_checks)
passed_checks = sum(1 for check in compliance_checks.values() if check)
compliance_score = (passed_checks / total_checks) * 100
return {
'framework': 'OWASP',
'compliance_score': compliance_score,
'checks': compliance_checks,
'recommendation': self.get_owasp_recommendations(compliance_checks)
}
def check_pci_dss_compliance(self) -> Dict[str, Any]:
"""Check compliance with PCI DSS requirements"""
compliance_checks = {
'firewall_protection': False,
'data_encryption': False, # TLS enforcement
'access_control': False,
'network_monitoring': False,
'security_logging': False,
'vulnerability_scanning': False
}
# Check TLS enforcement
domain_config_url = f"{self.base_url}/domains/{self.domain}/config"
headers = {"Authorization": f"Bearer {self.api_token}"}
try:
response = requests.get(domain_config_url, headers=headers)
config = response.json()
# Check for HTTPS redirect
if config.get('redirect_mode') in ['https', 'https-www', 'https-non-www']:
compliance_checks['data_encryption'] = True
except Exception:
pass
# Check firewall and monitoring
analytics_url = f"{self.base_url}/domains/{self.domain}/firewall/events/analytics"
try:
response = requests.get(analytics_url, headers=headers)
analytics = response.json()
if analytics.get('summary', {}).get('total_blocked', 0) > 0:
compliance_checks['firewall_protection'] = True
compliance_checks['network_monitoring'] = True
compliance_checks['security_logging'] = True
except Exception:
pass
# Calculate compliance score
total_checks = len(compliance_checks)
passed_checks = sum(1 for check in compliance_checks.values() if check)
compliance_score = (passed_checks / total_checks) * 100
return {
'framework': 'PCI DSS',
'compliance_score': compliance_score,
'checks': compliance_checks,
'recommendation': self.get_pci_recommendations(compliance_checks)
}
def get_owasp_recommendations(self, checks: Dict[str, bool]) -> List[str]:
"""Get recommendations for OWASP compliance"""
recommendations = []
if not checks['sql_injection_protection']:
recommendations.append("Implement SQL injection protection rules")
if not checks['xss_protection']:
recommendations.append("Add cross-site scripting (XSS) protection")
if not checks['rate_limiting']:
recommendations.append("Configure rate limiting to prevent brute force attacks")
if not checks['authentication_protection']:
recommendations.append("Add authentication and authorization controls")
return recommendations
def get_pci_recommendations(self, checks: Dict[str, bool]) -> List[str]:
"""Get recommendations for PCI DSS compliance"""
recommendations = []
if not checks['data_encryption']:
recommendations.append("Enforce HTTPS/TLS encryption for all traffic")
if not checks['firewall_protection']:
recommendations.append("Enable comprehensive firewall protection")
if not checks['network_monitoring']:
recommendations.append("Implement continuous network monitoring")
return recommendations
def generate_compliance_report(self) -> Dict[str, Any]:
"""Generate comprehensive compliance report"""
owasp_compliance = self.check_owasp_compliance()
pci_compliance = self.check_pci_dss_compliance()
# Overall compliance score (weighted average)
overall_score = (owasp_compliance['compliance_score'] * 0.6 +
pci_compliance['compliance_score'] * 0.4)
return {
'report_date': datetime.now().isoformat(),
'domain': self.domain,
'overall_compliance_score': overall_score,
'compliance_status': 'compliant' if overall_score >= 80 else 'needs_improvement',
'frameworks': {
'owasp': owasp_compliance,
'pci_dss': pci_compliance
},
'priority_actions': self.get_priority_actions([owasp_compliance, pci_compliance])
}
def get_priority_actions(self, compliance_results: List[Dict[str, Any]]) -> List[str]:
"""Get priority actions from all compliance frameworks"""
all_recommendations = []
for result in compliance_results:
all_recommendations.extend(result.get('recommendation', []))
# Remove duplicates and prioritize
unique_recommendations = list(set(all_recommendations))
# Simple prioritization (could be more sophisticated)
priority_order = [
"Enforce HTTPS/TLS encryption",
"Implement SQL injection protection",
"Add cross-site scripting (XSS) protection",
"Configure rate limiting",
"Enable firewall protection"
]
prioritized = []
for priority_item in priority_order:
for recommendation in unique_recommendations:
if priority_item.lower() in recommendation.lower():
prioritized.append(recommendation)
break
# Add remaining recommendations
for recommendation in unique_recommendations:
if recommendation not in prioritized:
prioritized.append(recommendation)
return prioritized[:5] # Top 5 priority actions
def main(): import os import sys
api_token = os.getenv('PEAKHOUR_API_TOKEN')
domain = os.getenv('DOMAIN')
if not api_token or not domain:
print("โ Missing required environment variables")
sys.exit(1)
monitor = ComplianceMonitor(api_token, domain)
report = monitor.generate_compliance_report()
print(f"๐ Compliance Report for {domain}")
print(f"Overall Compliance Score: {report['overall_compliance_score']:.1f}%")
print(f"Status: {report['compliance_status']}")
print("\n๐ Framework Scores:")
for framework, results in report['frameworks'].items():
print(f" โข {results['framework']}: {results['compliance_score']:.1f}%")
if report['priority_actions']:
print("\nโก Priority Actions:")
for i, action in enumerate(report['priority_actions'], 1):
print(f" {i}. {action}")
# Save report to file
with open(f"compliance-report-{datetime.now().strftime('%Y%m%d')}.json", 'w') as f:
json.dump(report, f, indent=2)
# Exit with appropriate code
if report['compliance_status'] == 'needs_improvement':
sys.exit(1)
else:
sys.exit(0)
if __name__ == "__main__":
main()
```
This comprehensive DevSecOps integration enables security automation throughout your development lifecycle, ensuring that security is not an afterthought but an integral part of your deployment process with continuous monitoring, automated threat response, and compliance validation.