Skip to content

DevSecOps Security Automation with Peakhour

This guide demonstrates how to integrate Peakhour's security capabilities into your DevSecOps pipeline, enabling automated threat detection, security policy enforcement, and incident response throughout your development lifecycle.

Before you begin: Understand Infrastructure as Code, security investigation workflows, and have familiarity with security automation concepts.

DevSecOps Integration Architecture

Peakhour provides comprehensive APIs and automation capabilities that integrate seamlessly with modern DevSecOps toolchains to create a security-first development lifecycle.

Security Automation Components

Pipeline Integration Points:

  • Pre-deployment: Security policy validation and threat modeling
  • Deployment: Automated security rule deployment and configuration
  • Post-deployment: Continuous security monitoring and incident response
  • Feedback Loop: Security insights fed back into development process

Automated Security Capabilities:

  • Threat Detection: Real-time WAF and firewall event processing
  • Policy Enforcement: Automated security rule deployment
  • Incident Response: Automated threat mitigation and alerting
  • Compliance Monitoring: Continuous security posture assessment

Security Policy as Code

Security Configuration Templates

Create reusable security policy templates that enforce organizational security standards:

security-policies/templates/owasp-top10.json:

{
  "name": "OWASP Top 10 Protection",
  "description": "Comprehensive protection against OWASP Top 10 vulnerabilities", 
  "rules": {
    "sql_injection_protection": {
      "name": "SQL Injection Protection",
      "description": "Block SQL injection attempts",
      "wirefilter_rule": "waf.matched_rule.tags contains \"sql\" and waf.matched_rule.severity in {\"CRITICAL\", \"ERROR\"}"
    },
    "xss_protection": {
      "name": "Cross-Site Scripting Protection", 
      "description": "Block XSS attempts",
      "wirefilter_rule": "waf.matched_rule.tags contains \"xss\" and waf.matched_rule.severity in {\"CRITICAL\", \"ERROR\"}"
    },
    "rce_protection": {
      "name": "Remote Code Execution Protection",
      "description": "Block RCE attempts", 
      "wirefilter_rule": "waf.matched_rule.tags contains \"rce\" and waf.matched_rule.severity eq \"CRITICAL\""
    },
    "lfi_protection": {
      "name": "Local File Inclusion Protection",
      "description": "Block LFI attempts",
      "wirefilter_rule": "waf.matched_rule.tags contains \"lfi\" and waf.matched_rule.severity in {\"CRITICAL\", \"ERROR\"}"
    }
  }
}

security-policies/templates/api-security.json:

{
  "name": "API Security Baseline",
  "description": "Comprehensive API protection and rate limiting",
  "rules": {
    "api_rate_limiting": {
      "name": "API Rate Limiting",
      "description": "Rate limit API endpoints to prevent abuse",
      "wirefilter_rule": "starts_with(http.request.uri.path, \"/api/\") and rate(1m) > 100"
    },
    "api_authentication_bypass": {
      "name": "API Authentication Bypass Detection",
      "description": "Detect attempts to bypass API authentication",
      "wirefilter_rule": "starts_with(http.request.uri.path, \"/api/\") and not http.request.headers[\"authorization\"] exists"
    },
    "api_method_restriction": {
      "name": "API Method Restriction",
      "description": "Restrict dangerous HTTP methods on API endpoints",
      "wirefilter_rule": "starts_with(http.request.uri.path, \"/api/\") and http.request.method in {\"TRACE\", \"OPTIONS\", \"DELETE\"}"
    }
  },
  "lists": {
    "api_abuse_ips": {
      "name": "API Abuse IP Addresses",
      "type": "ip",
      "ips": []
    }
  }
}

Environment-Specific Security Policies

security-policies/environments/production-security.json:

{
  "name": "Production Security Policy",
  "description": "High-security policy for production environments",
  "rules": {
    "block_high_risk_countries": {
      "name": "Block High-Risk Countries",
      "description": "Block traffic from high-risk geographic regions",
      "wirefilter_rule": "ip.geoip.country in {\"CN\", \"RU\", \"KP\", \"IR\"} and not (ip.src in $trusted_partners)"
    },
    "admin_access_restriction": {
      "name": "Admin Access Restriction",
      "description": "Restrict admin access to authorized IPs only",
      "wirefilter_rule": "starts_with(http.request.uri.path, \"/admin\") and not (ip.src in $admin_ips)"
    },
    "threat_intelligence_blocking": {
      "name": "Threat Intelligence Blocking",
      "description": "Block IPs from threat intelligence feeds",
      "wirefilter_rule": "ip.src in $malicious_ips or ip.src in $webattacks"
    }
  },
  "lists": {
    "trusted_partners": {
      "name": "Trusted Partner IPs",
      "type": "ip", 
      "ips": ["203.0.113.0/24", "198.51.100.0/24"]
    },
    "admin_ips": {
      "name": "Administrator IP Whitelist",
      "type": "ip",
      "ips": ["192.168.1.0/24", "10.0.0.0/16"]
    }
  }
}

Automated Threat Detection and Response

Security Event Processing Pipeline

View script: scripts/security/threat-processor.py

```python

!/usr/bin/env python3

import json import requests import time from datetime import datetime, timedelta from typing import Dict, List, Any import hashlib

class ThreatProcessor: def init(self, api_token: str, domain: str): self.api_token = api_token self.domain = domain self.base_url = "https://api.peakhour.io"

def get_security_events(self, time_range: str = "1h") -> List[Dict[str, Any]]:
    """Retrieve security events from Peakhour"""
    url = f"{self.base_url}/domains/{self.domain}/firewall/events/list"
    headers = {"Authorization": f"Bearer {self.api_token}"}

    # Calculate time range
    end_time = datetime.now()
    if time_range == "1h":
        start_time = end_time - timedelta(hours=1)
    elif time_range == "24h":
        start_time = end_time - timedelta(days=1)
    else:
        start_time = end_time - timedelta(hours=1)

    params = {
        "from_date": start_time.isoformat(),
        "to_date": end_time.isoformat(),
        "event_type": "all"
    }

    response = requests.get(url, headers=headers, params=params)
    response.raise_for_status()

    return response.json()

def analyze_attack_patterns(self, events: List[Dict[str, Any]]) -> Dict[str, Any]:
    """Analyze attack patterns from security events"""

    # Group events by IP and attack type
    ip_attacks = {}
    attack_types = {}
    geographic_distribution = {}

    for event in events:
        ip = event.get('client', 'unknown')
        attack_type = event.get('by', 'unknown')
        country = event.get('geoip_country_code', 'unknown')
        severity = event.get('waf', {}).get('matched_rule', {}).get('severity', 'unknown')

        # Count attacks by IP
        if ip not in ip_attacks:
            ip_attacks[ip] = {
                'count': 0,
                'attack_types': set(),
                'countries': set(),
                'severities': set()
            }

        ip_attacks[ip]['count'] += 1
        ip_attacks[ip]['attack_types'].add(attack_type)
        ip_attacks[ip]['countries'].add(country)
        ip_attacks[ip]['severities'].add(severity)

        # Count attack types
        attack_types[attack_type] = attack_types.get(attack_type, 0) + 1

        # Geographic distribution
        geographic_distribution[country] = geographic_distribution.get(country, 0) + 1

    # Identify top attackers
    top_attackers = sorted(
        ip_attacks.items(), 
        key=lambda x: x[1]['count'], 
        reverse=True
    )[:10]

    # Identify coordinated attacks (same attack type from multiple IPs)
    coordinated_attacks = {}
    for ip, data in ip_attacks.items():
        for attack_type in data['attack_types']:
            if attack_type not in coordinated_attacks:
                coordinated_attacks[attack_type] = []
            coordinated_attacks[attack_type].append(ip)

    # Filter for coordinated attacks (3+ IPs with same attack type)
    coordinated_attacks = {
        k: v for k, v in coordinated_attacks.items() 
        if len(v) >= 3
    }

    return {
        'total_events': len(events),
        'unique_attackers': len(ip_attacks),
        'top_attackers': top_attackers,
        'attack_type_distribution': attack_types,
        'geographic_distribution': geographic_distribution,
        'coordinated_attacks': coordinated_attacks,
        'analysis_timestamp': datetime.now().isoformat()
    }

def generate_threat_intelligence(self, analysis: Dict[str, Any]) -> List[str]:
    """Generate actionable threat intelligence from analysis"""
    threats = []

    # High-frequency attackers
    for ip, data in analysis['top_attackers'][:5]:
        if data['count'] > 10:  # More than 10 attacks
            threats.append({
                'type': 'high_frequency_attacker',
                'ip': ip,
                'attack_count': data['count'],
                'attack_types': list(data['attack_types']),
                'recommendation': 'Add to IP blocklist'
            })

    # Coordinated attack campaigns
    for attack_type, ips in analysis['coordinated_attacks'].items():
        if len(ips) >= 5:  # 5+ IPs with same attack type
            threats.append({
                'type': 'coordinated_campaign',
                'attack_type': attack_type,
                'participating_ips': ips,
                'ip_count': len(ips),
                'recommendation': 'Implement geographic or ASN-based blocking'
            })

    # Geographic anomalies
    total_events = analysis['total_events']
    for country, count in analysis['geographic_distribution'].items():
        if count > total_events * 0.3:  # More than 30% from one country
            threats.append({
                'type': 'geographic_anomaly',
                'country': country,
                'event_count': count,
                'percentage': (count / total_events) * 100,
                'recommendation': 'Consider country-specific rate limiting'
            })

    return threats

def auto_mitigate_threats(self, threats: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """Automatically mitigate identified threats"""
    mitigations = []

    for threat in threats:
        if threat['type'] == 'high_frequency_attacker':
            # Add to blocklist
            ip = threat['ip']
            if self.add_ip_to_blocklist(ip, f"Auto-blocked: {threat['attack_count']} attacks"):
                mitigations.append({
                    'type': 'ip_block',
                    'target': ip,
                    'status': 'success',
                    'timestamp': datetime.now().isoformat()
                })

        elif threat['type'] == 'coordinated_campaign':
            # Implement rate limiting for attack type
            attack_type = threat['attack_type']
            if self.implement_attack_type_rate_limit(attack_type):
                mitigations.append({
                    'type': 'rate_limit',
                    'target': attack_type,
                    'status': 'success', 
                    'timestamp': datetime.now().isoformat()
                })

    return mitigations

def add_ip_to_blocklist(self, ip: str, reason: str) -> bool:
    """Add IP to automatic blocklist"""
    try:
        # Get or create auto-blocklist
        list_name = "auto_threat_blocking"
        list_data = self.get_or_create_list(list_name, "ip", "Automatically blocked threat IPs")

        # Add IP to list
        url = f"{self.base_url}/domains/{self.domain}/edge-access/lists/{list_data['uuid']}"
        headers = {
            "Authorization": f"Bearer {self.api_token}",
            "Content-Type": "application/json"
        }

        # Get current IPs and add new one
        current_ips = list_data.get('ips', [])
        if ip not in current_ips:
            current_ips.append(ip)

            payload = {"ips": current_ips}
            response = requests.patch(url, json=payload, headers=headers)
            response.raise_for_status()

            print(f"โœ… Added {ip} to auto-blocklist: {reason}")
            return True

        return True  # Already in list

    except Exception as e:
        print(f"โŒ Failed to add {ip} to blocklist: {e}")
        return False

def get_or_create_list(self, name: str, list_type: str, description: str) -> Dict[str, Any]:
    """Get existing list or create new one"""
    # Try to get existing list
    url = f"{self.base_url}/domains/{self.domain}/edge-access/lists"
    headers = {"Authorization": f"Bearer {self.api_token}"}

    response = requests.get(url, headers=headers)
    response.raise_for_status()

    lists = response.json()
    for lst in lists:
        if lst['name'] == name:
            return lst

    # Create new list
    payload = {
        "name": name,
        "type": list_type,
        "description": description
    }

    if list_type == "ip":
        payload["ips"] = []

    response = requests.post(url, json=payload, headers=headers)
    response.raise_for_status()

    return response.json()

def implement_attack_type_rate_limit(self, attack_type: str) -> bool:
    """Implement rate limiting for specific attack type"""
    try:
        # This would create a rule to rate limit specific attack patterns
        # Implementation depends on your specific rule management system
        print(f"๐Ÿ›ก๏ธ Implemented rate limiting for {attack_type} attacks")
        return True
    except Exception as e:
        print(f"โŒ Failed to implement rate limiting for {attack_type}: {e}")
        return False

def main(): import os import sys

api_token = os.getenv('PEAKHOUR_API_TOKEN')
domain = os.getenv('DOMAIN')

if not api_token or not domain:
    print("โŒ Missing required environment variables")
    sys.exit(1)

processor = ThreatProcessor(api_token, domain)

print("๐Ÿ” Retrieving security events...")
events = processor.get_security_events("1h")

if not events:
    print("โœ… No security events in the last hour")
    return

print(f"๐Ÿ“Š Analyzing {len(events)} security events...")
analysis = processor.analyze_attack_patterns(events)

print(f"๐Ÿ” Found {analysis['unique_attackers']} unique attackers")
print(f"๐Ÿ” Attack type distribution: {analysis['attack_type_distribution']}")

# Generate threat intelligence
threats = processor.generate_threat_intelligence(analysis)

if threats:
    print(f"โš ๏ธ Identified {len(threats)} threats:")
    for threat in threats:
        print(f"   โ€ข {threat['type']}: {threat.get('recommendation', 'No recommendation')}")

    # Auto-mitigate critical threats
    print("๐Ÿ›ก๏ธ Auto-mitigating threats...")
    mitigations = processor.auto_mitigate_threats(threats)

    if mitigations:
        print(f"โœ… Applied {len(mitigations)} mitigations")
        for mitigation in mitigations:
            print(f"   โ€ข {mitigation['type']}: {mitigation['target']}")
else:
    print("โœ… No actionable threats identified")

if __name__ == "__main__":
    main()
```

Automated Incident Response

View script: scripts/security/incident-response.py

```python

!/usr/bin/env python3

import json import requests import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from datetime import datetime from typing import Dict, List, Any

class IncidentResponse: def init(self, api_token: str, domain: str, config: Dict[str, Any]): self.api_token = api_token self.domain = domain self.config = config self.base_url = "https://api.peakhour.io"

def detect_security_incidents(self) -> List[Dict[str, Any]]:
    """Detect security incidents based on configured thresholds"""
    incidents = []

    # Get recent security events
    url = f"{self.base_url}/domains/{self.domain}/firewall/events/analytics"
    headers = {"Authorization": f"Bearer {self.api_token}"}

    response = requests.get(url, headers=headers)
    response.raise_for_status()
    analytics = response.json()

    # Check for incident conditions
    total_events = analytics.get('summary', {}).get('total_hits', 0)
    blocked_events = analytics.get('summary', {}).get('total_blocked', 0)

    # High attack volume incident
    if total_events > self.config.get('high_volume_threshold', 1000):
        incidents.append({
            'type': 'high_attack_volume',
            'severity': 'high',
            'metrics': {
                'total_events': total_events,
                'blocked_events': blocked_events
            },
            'description': f'High attack volume detected: {total_events} events in last hour'
        })

    # Low block rate incident (possible bypass)
    if total_events > 100:  # Only if we have significant traffic
        block_rate = (blocked_events / total_events) * 100 if total_events > 0 else 0
        if block_rate < self.config.get('min_block_rate', 80):
            incidents.append({
                'type': 'low_block_rate',
                'severity': 'medium',
                'metrics': {
                    'block_rate': block_rate,
                    'total_events': total_events
                },
                'description': f'Low block rate detected: {block_rate:.1f}% (threshold: {self.config.get("min_block_rate", 80)}%)'
            })

    return incidents

def execute_incident_response(self, incident: Dict[str, Any]) -> Dict[str, Any]:
    """Execute automated response for specific incident types"""
    response_actions = []

    if incident['type'] == 'high_attack_volume':
        # Implement emergency rate limiting
        if self.enable_emergency_rate_limiting():
            response_actions.append('emergency_rate_limiting_enabled')

        # Enable additional WAF rules
        if self.enable_enhanced_waf():
            response_actions.append('enhanced_waf_enabled')

    elif incident['type'] == 'low_block_rate':
        # Review and tighten security rules
        if self.review_security_rules():
            response_actions.append('security_rules_reviewed')

    # Always send notifications for incidents
    if self.send_incident_notification(incident):
        response_actions.append('notification_sent')

    return {
        'incident_id': self.generate_incident_id(incident),
        'timestamp': datetime.now().isoformat(),
        'actions_taken': response_actions,
        'status': 'responded'
    }

def enable_emergency_rate_limiting(self) -> bool:
    """Enable emergency rate limiting rules"""
    try:
        # This would implement emergency rate limiting
        # by creating or enabling stricter rate limit rules
        print("๐Ÿšจ Emergency rate limiting enabled")
        return True
    except Exception as e:
        print(f"โŒ Failed to enable emergency rate limiting: {e}")
        return False

def enable_enhanced_waf(self) -> bool:
    """Enable enhanced WAF protection"""
    try:
        # This would enable additional WAF rules or 
        # increase WAF sensitivity during incidents
        print("๐Ÿ›ก๏ธ Enhanced WAF protection enabled")
        return True
    except Exception as e:
        print(f"โŒ Failed to enable enhanced WAF: {e}")
        return False

def send_incident_notification(self, incident: Dict[str, Any]) -> bool:
    """Send incident notification to security team"""
    try:
        # Create incident report
        subject = f"๐Ÿšจ Security Incident Alert - {incident['type']} - {self.domain}"

        body = f"""

Security Incident Detected

Domain: {self.domain} Incident Type: {incident['type']} Severity: {incident['severity']} Time: {datetime.now().isoformat()}

Description: {incident['description']}

Metrics: {json.dumps(incident.get('metrics', {}), indent=2)}

Automated response has been initiated. Please review the incident and take additional action if necessary.

Peakhour Security Automation """

        # Send email notification
        msg = MIMEText(body)
        msg['Subject'] = subject
        msg['From'] = self.config.get('from_email', 'security@peakhour.io')
        msg['To'] = ', '.join(self.config.get('security_emails', []))

        # SMTP configuration from environment/config
        smtp_config = self.config.get('smtp', {})
        if smtp_config:
            with smtplib.SMTP(smtp_config['host'], smtp_config['port']) as server:
                if smtp_config.get('tls'):
                    server.starttls()
                if smtp_config.get('username'):
                    server.login(smtp_config['username'], smtp_config['password'])
                server.send_message(msg)

        # Also send to webhook if configured
        webhook_url = self.config.get('webhook_url')
        if webhook_url:
            webhook_payload = {
                'incident': incident,
                'domain': self.domain,
                'timestamp': datetime.now().isoformat()
            }
            requests.post(webhook_url, json=webhook_payload)

        return True

    except Exception as e:
        print(f"โŒ Failed to send incident notification: {e}")
        return False

def generate_incident_id(self, incident: Dict[str, Any]) -> str:
    """Generate unique incident ID"""
    import hashlib
    content = f"{self.domain}_{incident['type']}_{datetime.now().date().isoformat()}"
    return hashlib.md5(content.encode()).hexdigest()[:8]

def review_security_rules(self) -> bool:
    """Review and potentially tighten security rules"""
    try:
        # This would analyze current rules and suggest/implement improvements
        print("๐Ÿ” Security rules reviewed and optimized")
        return True
    except Exception as e:
        print(f"โŒ Failed to review security rules: {e}")
        return False

def main(): import os import sys

# Configuration
config = {
    'high_volume_threshold': int(os.getenv('HIGH_VOLUME_THRESHOLD', '1000')),
    'min_block_rate': float(os.getenv('MIN_BLOCK_RATE', '80.0')),
    'security_emails': os.getenv('SECURITY_EMAILS', '').split(',') if os.getenv('SECURITY_EMAILS') else [],
    'webhook_url': os.getenv('SECURITY_WEBHOOK_URL'),
    'smtp': {
        'host': os.getenv('SMTP_HOST'),
        'port': int(os.getenv('SMTP_PORT', '587')),
        'tls': os.getenv('SMTP_TLS', 'true').lower() == 'true',
        'username': os.getenv('SMTP_USERNAME'),
        'password': os.getenv('SMTP_PASSWORD')
    } if os.getenv('SMTP_HOST') else None
}

api_token = os.getenv('PEAKHOUR_API_TOKEN')
domain = os.getenv('DOMAIN')

if not api_token or not domain:
    print("โŒ Missing required environment variables")
    sys.exit(1)

incident_responder = IncidentResponse(api_token, domain, config)

print("๐Ÿ” Detecting security incidents...")
incidents = incident_responder.detect_security_incidents()

if incidents:
    print(f"๐Ÿšจ Detected {len(incidents)} security incidents")

    for incident in incidents:
        print(f"   โ€ข {incident['type']}: {incident['severity']} severity")

        # Execute incident response
        response = incident_responder.execute_incident_response(incident)
        print(f"   โ€ข Response: {', '.join(response['actions_taken'])}")
else:
    print("โœ… No security incidents detected")

if __name__ == "__main__":
    main()
```

CI/CD Security Integration

Security-First Pipeline

View pipeline: .github/workflows/devsecops.yml

```yaml name: DevSecOps Security Pipeline

on: push: branches: [main, staging, dev] pull_request: branches: [main]

env: PEAKHOUR_API_BASE_URL: https://api.peakhour.io

jobs: security-validation: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3

- name: Set up Python
  uses: actions/setup-python@v4
  with:
    python-version: '3.9'

- name: Install security tools
  run: |
    pip install requests bandit safety semgrep

- name: Security Policy Validation
  run: |
    # Validate security configurations
    python scripts/validate/security-policy-validator.py

- name: Code Security Scan
  run: |
    # Scan for security vulnerabilities in deployment scripts
    bandit -r scripts/ -f json -o bandit-report.json || true

    # Check for known vulnerabilities in dependencies
    safety check --json --output safety-report.json || true

    # Static analysis for security issues
    semgrep --config=auto scripts/ --json --output=semgrep-report.json || true

- name: Upload Security Reports
  uses: actions/upload-artifact@v3
  with:
    name: security-reports
    path: |
      bandit-report.json
      safety-report.json
      semgrep-report.json

threat-modeling: runs-on: ubuntu-latest if: github.event_name == 'pull_request' steps: - uses: actions/checkout@v3

- name: Automated Threat Modeling
  run: |
    # Analyze configuration changes for security implications
    python scripts/security/threat-modeling.py \
      --changed-files="$(git diff --name-only origin/main...HEAD)" \
      --output="threat-model-report.json"

- name: Security Review Comment
  uses: actions/github-script@v6
  with:
    script: |
      const fs = require('fs');
      const report = JSON.parse(fs.readFileSync('threat-model-report.json'));

      const comment = `## ๐Ÿ”’ Security Analysis

      **Threat Assessment**: ${report.risk_level}

      **Identified Risks**:
      ${report.risks.map(r => `โ€ข ${r.description} (${r.severity})`).join('\n')}

      **Recommendations**:
      ${report.recommendations.map(r => `โ€ข ${r}`).join('\n')}
      `;

      github.rest.issues.createComment({
        issue_number: context.issue.number,
        owner: context.repo.owner,
        repo: context.repo.repo,
        body: comment
      });

security-testing: needs: security-validation runs-on: ubuntu-latest steps: - uses: actions/checkout@v3

- name: Set up Python
  uses: actions/setup-python@v4
  with:
    python-version: '3.9'

- name: Install dependencies
  run: pip install requests

- name: Security Configuration Test
  env:
    PEAKHOUR_API_TOKEN: ${{ secrets.PEAKHOUR_API_TOKEN_TEST }}
    DOMAIN: test.example.com
  run: |
    # Test security rules in isolated environment
    python scripts/testing/security-rule-tester.py

- name: Penetration Testing
  run: |
    # Automated penetration testing against test environment
    python scripts/testing/automated-pentest.py \
      --target="https://test.example.com" \
      --report="pentest-report.json"

deploy-security: needs: [security-validation, security-testing] runs-on: ubuntu-latest if: github.ref == 'refs/heads/main' environment: production

steps:
- uses: actions/checkout@v3

- name: Set up Python
  uses: actions/setup-python@v4
  with:
    python-version: '3.9'

- name: Install dependencies
  run: pip install requests

- name: Deploy Security Configuration
  env:
    PEAKHOUR_API_TOKEN: ${{ secrets.PEAKHOUR_API_TOKEN_PROD }}
    DOMAIN: example.com
    ENVIRONMENT: prod
  run: |
    # Deploy with security-first approach
    python scripts/deploy/secure-deploy.py

- name: Post-Deployment Security Verification
  env:
    PEAKHOUR_API_TOKEN: ${{ secrets.PEAKHOUR_API_TOKEN_PROD }}
    DOMAIN: example.com
  run: |
    # Verify security posture after deployment
    python scripts/security/security-verification.py

continuous-monitoring: runs-on: ubuntu-latest if: github.ref == 'refs/heads/main' steps: - uses: actions/checkout@v3

- name: Set up Python
  uses: actions/setup-python@v4
  with:
    python-version: '3.9'

- name: Install dependencies
  run: pip install requests

- name: Enable Continuous Security Monitoring
  env:
    PEAKHOUR_API_TOKEN: ${{ secrets.PEAKHOUR_API_TOKEN_PROD }}
    DOMAIN: example.com
  run: |
    # Set up continuous monitoring and alerting
    python scripts/security/setup-monitoring.py
```

Automated Security Testing

View script: scripts/testing/security-rule-tester.py

```python

!/usr/bin/env python3

import requests import json import time from typing import Dict, List, Any

class SecurityRuleTester: def init(self, api_token: str, domain: str): self.api_token = api_token self.domain = domain self.base_url = "https://api.peakhour.io"

def test_sql_injection_protection(self) -> Dict[str, Any]:
    """Test SQL injection protection rules"""
    test_payloads = [
        "' OR '1'='1",
        "1' UNION SELECT * FROM users--",
        "'; DROP TABLE users; --",
        "admin'/**/OR/**/1=1/**/--"
    ]

    results = []
    for payload in test_payloads:
        test_url = f"https://{self.domain}/test?id={payload}"

        try:
            response = requests.get(test_url, timeout=10)

            # Should be blocked (403, 406, etc.) not 200
            if response.status_code == 200:
                results.append({
                    'payload': payload,
                    'status': 'failed',
                    'reason': f'SQL injection not blocked (HTTP {response.status_code})'
                })
            else:
                results.append({
                    'payload': payload,
                    'status': 'passed',
                    'reason': f'SQL injection blocked (HTTP {response.status_code})'
                })
        except requests.exceptions.RequestException as e:
            results.append({
                'payload': payload,
                'status': 'error',
                'reason': str(e)
            })

    passed = sum(1 for r in results if r['status'] == 'passed')
    return {
        'test_type': 'sql_injection_protection',
        'total_tests': len(results),
        'passed': passed,
        'success_rate': (passed / len(results)) * 100,
        'details': results
    }

def test_xss_protection(self) -> Dict[str, Any]:
    """Test XSS protection rules"""
    test_payloads = [
        "<script>alert('xss')</script>",
        "javascript:alert('xss')",
        "<img src=x onerror=alert('xss')>",
        "'><script>alert(String.fromCharCode(88,83,83))</script>"
    ]

    results = []
    for payload in test_payloads:
        test_url = f"https://{self.domain}/search?q={payload}"

        try:
            response = requests.get(test_url, timeout=10)

            if response.status_code == 200:
                # Check if payload is reflected without encoding
                if payload in response.text:
                    results.append({
                        'payload': payload,
                        'status': 'failed',
                        'reason': 'XSS payload reflected without protection'
                    })
                else:
                    results.append({
                        'payload': payload,
                        'status': 'passed',
                        'reason': 'XSS payload sanitized or blocked'
                    })
            else:
                results.append({
                    'payload': payload,
                    'status': 'passed',
                    'reason': f'XSS blocked (HTTP {response.status_code})'
                })
        except requests.exceptions.RequestException as e:
            results.append({
                'payload': payload,
                'status': 'error',
                'reason': str(e)
            })

    passed = sum(1 for r in results if r['status'] == 'passed')
    return {
        'test_type': 'xss_protection',
        'total_tests': len(results),
        'passed': passed,
        'success_rate': (passed / len(results)) * 100,
        'details': results
    }

def test_rate_limiting(self) -> Dict[str, Any]:
    """Test rate limiting configuration"""
    test_endpoint = f"https://{self.domain}/api/test"

    # Make rapid requests to trigger rate limiting
    responses = []
    for i in range(50):  # 50 rapid requests
        try:
            response = requests.get(test_endpoint, timeout=5)
            responses.append(response.status_code)
        except requests.exceptions.RequestException:
            responses.append(0)  # Connection error

        time.sleep(0.1)  # Brief delay between requests

    # Analyze responses
    blocked_responses = sum(1 for r in responses if r in [429, 403, 406])
    successful_responses = sum(1 for r in responses if r == 200)

    return {
        'test_type': 'rate_limiting',
        'total_requests': len(responses),
        'successful_requests': successful_responses,
        'blocked_requests': blocked_responses,
        'block_rate': (blocked_responses / len(responses)) * 100,
        'rate_limiting_effective': blocked_responses > 0
    }

def run_all_security_tests(self) -> Dict[str, Any]:
    """Run all security tests and compile results"""
    print("๐Ÿงช Running security rule tests...")

    tests = [
        self.test_sql_injection_protection(),
        self.test_xss_protection(), 
        self.test_rate_limiting()
    ]

    # Calculate overall results
    total_tests = sum(test.get('total_tests', test.get('total_requests', 0)) for test in tests)
    total_passed = sum(test.get('passed', 0) for test in tests)

    # Add rate limiting success
    if tests[2]['rate_limiting_effective']:
        total_passed += 1
        total_tests += 1

    return {
        'timestamp': time.time(),
        'domain': self.domain,
        'overall_success_rate': (total_passed / total_tests) * 100 if total_tests > 0 else 0,
        'test_results': tests,
        'security_posture': 'strong' if (total_passed / total_tests) > 0.8 else 'needs_improvement'
    }

def main(): import os import sys

api_token = os.getenv('PEAKHOUR_API_TOKEN')
domain = os.getenv('DOMAIN')

if not api_token or not domain:
    print("โŒ Missing required environment variables")
    sys.exit(1)

tester = SecurityRuleTester(api_token, domain)
results = tester.run_all_security_tests()

print(f"๐Ÿ”’ Security Test Results for {domain}")
print(f"Overall Success Rate: {results['overall_success_rate']:.1f}%")
print(f"Security Posture: {results['security_posture']}")

for test in results['test_results']:
    test_type = test.get('test_type', 'unknown')
    if 'success_rate' in test:
        print(f"   โ€ข {test_type}: {test['success_rate']:.1f}% ({test['passed']}/{test['total_tests']})")
    elif 'rate_limiting_effective' in test:
        print(f"   โ€ข {test_type}: {'โœ… Effective' if test['rate_limiting_effective'] else 'โŒ Not Effective'}")

# Exit with error code if security posture is poor
if results['security_posture'] == 'needs_improvement':
    print("โŒ Security tests indicate improvements needed")
    sys.exit(1)
else:
    print("โœ… Security tests passed")

if __name__ == "__main__":
    main()
```

Compliance and Reporting

Automated Compliance Monitoring

View script: scripts/compliance/compliance-monitor.py

```python

!/usr/bin/env python3

import json import requests from datetime import datetime, timedelta from typing import Dict, List, Any

class ComplianceMonitor: def init(self, api_token: str, domain: str): self.api_token = api_token self.domain = domain self.base_url = "https://api.peakhour.io"

def check_owasp_compliance(self) -> Dict[str, Any]:
    """Check compliance with OWASP security standards"""

    compliance_checks = {
        'sql_injection_protection': False,
        'xss_protection': False,
        'csrf_protection': False,
        'security_headers': False,
        'rate_limiting': False,
        'authentication_protection': False
    }

    # Get current security configuration
    rules_url = f"{self.base_url}/domains/{self.domain}/edge-access/rules"
    headers = {"Authorization": f"Bearer {self.api_token}"}

    response = requests.get(rules_url, headers=headers)
    response.raise_for_status()
    rules = response.json()

    # Check for SQL injection protection
    for rule in rules:
        wirefilter_rule = rule.get('wirefilter_rule', '').lower()
        if 'sql' in wirefilter_rule or 'injection' in wirefilter_rule:
            compliance_checks['sql_injection_protection'] = True

        if 'xss' in wirefilter_rule:
            compliance_checks['xss_protection'] = True

        if 'rate(' in wirefilter_rule:
            compliance_checks['rate_limiting'] = True

        if 'auth' in wirefilter_rule or 'login' in wirefilter_rule:
            compliance_checks['authentication_protection'] = True

    # Calculate compliance score
    total_checks = len(compliance_checks)
    passed_checks = sum(1 for check in compliance_checks.values() if check)
    compliance_score = (passed_checks / total_checks) * 100

    return {
        'framework': 'OWASP',
        'compliance_score': compliance_score,
        'checks': compliance_checks,
        'recommendation': self.get_owasp_recommendations(compliance_checks)
    }

def check_pci_dss_compliance(self) -> Dict[str, Any]:
    """Check compliance with PCI DSS requirements"""

    compliance_checks = {
        'firewall_protection': False,
        'data_encryption': False,  # TLS enforcement
        'access_control': False,
        'network_monitoring': False,
        'security_logging': False,
        'vulnerability_scanning': False
    }

    # Check TLS enforcement
    domain_config_url = f"{self.base_url}/domains/{self.domain}/config"
    headers = {"Authorization": f"Bearer {self.api_token}"}

    try:
        response = requests.get(domain_config_url, headers=headers)
        config = response.json()

        # Check for HTTPS redirect
        if config.get('redirect_mode') in ['https', 'https-www', 'https-non-www']:
            compliance_checks['data_encryption'] = True

    except Exception:
        pass

    # Check firewall and monitoring
    analytics_url = f"{self.base_url}/domains/{self.domain}/firewall/events/analytics"
    try:
        response = requests.get(analytics_url, headers=headers)
        analytics = response.json()

        if analytics.get('summary', {}).get('total_blocked', 0) > 0:
            compliance_checks['firewall_protection'] = True
            compliance_checks['network_monitoring'] = True
            compliance_checks['security_logging'] = True

    except Exception:
        pass

    # Calculate compliance score
    total_checks = len(compliance_checks)
    passed_checks = sum(1 for check in compliance_checks.values() if check)
    compliance_score = (passed_checks / total_checks) * 100

    return {
        'framework': 'PCI DSS',
        'compliance_score': compliance_score,
        'checks': compliance_checks,
        'recommendation': self.get_pci_recommendations(compliance_checks)
    }

def get_owasp_recommendations(self, checks: Dict[str, bool]) -> List[str]:
    """Get recommendations for OWASP compliance"""
    recommendations = []

    if not checks['sql_injection_protection']:
        recommendations.append("Implement SQL injection protection rules")

    if not checks['xss_protection']:
        recommendations.append("Add cross-site scripting (XSS) protection")

    if not checks['rate_limiting']:
        recommendations.append("Configure rate limiting to prevent brute force attacks")

    if not checks['authentication_protection']:
        recommendations.append("Add authentication and authorization controls")

    return recommendations

def get_pci_recommendations(self, checks: Dict[str, bool]) -> List[str]:
    """Get recommendations for PCI DSS compliance"""
    recommendations = []

    if not checks['data_encryption']:
        recommendations.append("Enforce HTTPS/TLS encryption for all traffic")

    if not checks['firewall_protection']:
        recommendations.append("Enable comprehensive firewall protection")

    if not checks['network_monitoring']:
        recommendations.append("Implement continuous network monitoring")

    return recommendations

def generate_compliance_report(self) -> Dict[str, Any]:
    """Generate comprehensive compliance report"""

    owasp_compliance = self.check_owasp_compliance()
    pci_compliance = self.check_pci_dss_compliance()

    # Overall compliance score (weighted average)
    overall_score = (owasp_compliance['compliance_score'] * 0.6 + 
                    pci_compliance['compliance_score'] * 0.4)

    return {
        'report_date': datetime.now().isoformat(),
        'domain': self.domain,
        'overall_compliance_score': overall_score,
        'compliance_status': 'compliant' if overall_score >= 80 else 'needs_improvement',
        'frameworks': {
            'owasp': owasp_compliance,
            'pci_dss': pci_compliance
        },
        'priority_actions': self.get_priority_actions([owasp_compliance, pci_compliance])
    }

def get_priority_actions(self, compliance_results: List[Dict[str, Any]]) -> List[str]:
    """Get priority actions from all compliance frameworks"""
    all_recommendations = []

    for result in compliance_results:
        all_recommendations.extend(result.get('recommendation', []))

    # Remove duplicates and prioritize
    unique_recommendations = list(set(all_recommendations))

    # Simple prioritization (could be more sophisticated)
    priority_order = [
        "Enforce HTTPS/TLS encryption",
        "Implement SQL injection protection",
        "Add cross-site scripting (XSS) protection", 
        "Configure rate limiting",
        "Enable firewall protection"
    ]

    prioritized = []
    for priority_item in priority_order:
        for recommendation in unique_recommendations:
            if priority_item.lower() in recommendation.lower():
                prioritized.append(recommendation)
                break

    # Add remaining recommendations
    for recommendation in unique_recommendations:
        if recommendation not in prioritized:
            prioritized.append(recommendation)

    return prioritized[:5]  # Top 5 priority actions

def main(): import os import sys

api_token = os.getenv('PEAKHOUR_API_TOKEN')
domain = os.getenv('DOMAIN')

if not api_token or not domain:
    print("โŒ Missing required environment variables")
    sys.exit(1)

monitor = ComplianceMonitor(api_token, domain)
report = monitor.generate_compliance_report()

print(f"๐Ÿ“‹ Compliance Report for {domain}")
print(f"Overall Compliance Score: {report['overall_compliance_score']:.1f}%")
print(f"Status: {report['compliance_status']}")

print("\n๐Ÿ” Framework Scores:")
for framework, results in report['frameworks'].items():
    print(f"   โ€ข {results['framework']}: {results['compliance_score']:.1f}%")

if report['priority_actions']:
    print("\nโšก Priority Actions:")
    for i, action in enumerate(report['priority_actions'], 1):
        print(f"   {i}. {action}")

# Save report to file
with open(f"compliance-report-{datetime.now().strftime('%Y%m%d')}.json", 'w') as f:
    json.dump(report, f, indent=2)

# Exit with appropriate code
if report['compliance_status'] == 'needs_improvement':
    sys.exit(1)
else:
    sys.exit(0)

if __name__ == "__main__":
    main()
```

This comprehensive DevSecOps integration enables security automation throughout your development lifecycle, ensuring that security is not an afterthought but an integral part of your deployment process with continuous monitoring, automated threat response, and compliance validation.