Threat Hunting on Wayback Machine [Python]

A Python tool that scans Wayback Machine archives for sensitive information, API keys, and potentially dangerous files. Features include retry logic, rate limiting, and async support.

๐Ÿ‘จโ€๐Ÿ’ป Coded with curiosity by cybercur@

Remember to use this tool responsibly and only scan domains you have permission to analyze. The Wayback Machine is a valuable resource, and we should respect its terms of service and rate limits.

๐Ÿ” Problem [Information Disclosure]

Wayback Machine archives billions of web pages, including potentially sensitive information that should have been removed. This can include:

  • API keys and credentials
  • Configuration files
  • Source code
  • Executable files
  • Database dumps
  • Log files

๐Ÿ’€ Why Is This Dangerous?

RiskExample
Credential ExposureAPI keys in .env files
Source Code LeaksUnintentionally published .py or .js files
Malware DistributionArchived .exe or .dll files
Configuration DisclosureDatabase credentials in config files
Sensitive Data ExposureLog files containing user data

๐Ÿงพ Wayback Machine Security Scanner

This tool is intended to identify sensitive information that might have been accidentally archived in the Wayback Machine. The scanner uses the Wayback Machine's CDX API to find archived URLs and then analyzes their content for potential security risks.

Key Features

  • Comprehensive File Detection: Scans for various sensitive file types
  • API Key Detection: Identifies multiple types of API keys and credentials
  • Retry Logic: Handles failed requests with exponential backoff
  • Rate Limiting: Respects Wayback Machine's rate limits
  • Async Support: Fast parallel processing with asyncio
  • Detailed Reporting: Generates JSON reports with findings

Source Code

#!/usr/bin/env python3
import asyncio
import aiohttp
import requests
import re
import json
import argparse
from urllib.parse import urlparse
from datetime import datetime
import time
import os
from typing import List, Dict, Any, Optional
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type

class WaybackSecurityScanner:
    def __init__(
        self,
        domain: str,
        max_workers: int = 5,
        output_file: Optional[str] = None,
        api_only: bool = False,
        scan_files: bool = True,
        rate_limit: float = 1.0
    ):
        self.domain = domain
        self.cdx_api_url = "http://web.archive.org/cdx/search/cdx"
        self.max_workers = max_workers
        self.api_only = api_only
        self.scan_files = scan_files
        self.rate_limit = rate_limit
        self.last_request_time = 0
        
        # Set output file to desktop if not specified
        if output_file:
            self.output_file = output_file
        else:
            desktop = os.path.join(os.path.expanduser("~"), "OneDrive", "Desktop")
            if not os.path.exists(desktop):
                desktop = os.path.join(os.path.expanduser("~"), "Desktop")
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            self.output_file = os.path.join(desktop, f"wayback_security_report_{timestamp}.json")
        
        # Sensitive file extensions to look for
        self.sensitive_extensions = [
            # Configuration files
            '.env', '.config', '.properties', '.ini', '.conf', '.cfg',
            # Source code files
            '.js', '.py', '.php', '.java', '.rb', '.go', '.cs',
            # Executables and binaries
            '.exe', '.dll', '.so', '.dylib', '.bin',
            # Database files
            '.sql', '.db', '.sqlite', '.mdb',
            # Log files
            '.log', '.txt',
            # Archive files
            '.zip', '.tar', '.gz', '.rar',
            # Certificate files
            '.pem', '.key', '.crt', '.cer',
            # Shell scripts
            '.sh', '.bat', '.ps1', '.cmd'
        ]
        
        # API key patterns to look for
        self.api_key_patterns = {
            'google_api': r'AIza[0-9A-Za-z-_]{35}',
            'openai_api': r'sk-[0-9A-Za-z]{32,}',
            'aws_access': r'AKIA[0-9A-Z]{16}',
            'aws_secret': r'[0-9A-Za-z/+]{40}',
            'firebase': r'[0-9a-f]{32}-us[0-9]{1,2}',
            'github_token': r'ghp_[0-9A-Za-z]{36}',
            'slack_token': r'xox[baprs]-[0-9A-Za-z-]{10,48}',
            'stripe_key': r'sk_live_[0-9a-zA-Z]{24}',
            'twilio_key': r'SK[0-9a-fA-F]{32}',
            'mailgun_key': r'key-[0-9a-f]{32}',
            'generic_api_key': r'[0-9A-Za-z]{32,}'
        }
        
        # Common password patterns
        self.password_patterns = {
            'basic_auth': r'[a-zA-Z0-9._%+-]+:[a-zA-Z0-9._%+-]+@',
            'database_url': r'(postgres|mysql|mongodb)://[a-zA-Z0-9._%+-]+:[a-zA-Z0-9._%+-]+@',
            'jwt_token': r'eyJ[a-zA-Z0-9-_]+\.eyJ[a-zA-Z0-9-_]+\.([a-zA-Z0-9-_]+)'
        }

    def _rate_limit(self):
        """Implement rate limiting between requests."""
        current_time = time.time()
        time_since_last = current_time - self.last_request_time
        if time_since_last < self.rate_limit:
            time.sleep(self.rate_limit - time_since_last)
        self.last_request_time = time.time()

    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=4, max=10),
        retry=retry_if_exception_type(requests.RequestException)
    )
    def get_archived_urls(self) -> List[List[str]]:
        """Fetch archived URLs from Wayback Machine CDX API."""
        self._rate_limit()
        
        # Create a list of all extensions we want to search for
        extensions = [ext.lstrip('.') for ext in self.sensitive_extensions]
        extension_pattern = '|'.join(extensions)
        
        params = {
            'url': self.domain,
            'matchType': 'domain',
            'output': 'json',
            'fl': 'original,timestamp,statuscode,mimetype',
            'filter': f'urlkey:.*\\.({extension_pattern})$',
            'collapse': 'urlkey',
            'limit': 10000
        }
        
        all_results = []
        
        try:
            print(f"Searching for sensitive files...")
            response = requests.get(self.cdx_api_url, params=params)
            response.raise_for_status()
            data = response.json()
            
            if len(data) > 1:
                results = data[1:]  # Skip header row
                all_results.extend(results)
                print(f"\nFound {len(results)} files with sensitive extensions")
                
                # Group and display results by extension
                files_by_ext = {}
                for result in results:
                    url = result[0]
                    ext = os.path.splitext(urlparse(url).path)[1].lower()
                    if ext not in files_by_ext:
                        files_by_ext[ext] = []
                    files_by_ext[ext].append(url)
                
                for ext, urls in sorted(files_by_ext.items()):
                    print(f"\n{ext} files ({len(urls)}):")
                    for url in urls:
                        print(f"  {url}")
            
            return all_results
            
        except requests.RequestException as e:
            print(f"Error fetching archived URLs: {e}")
            return []

    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=4, max=10),
        retry=retry_if_exception_type((requests.RequestException, aiohttp.ClientError))
    )
    async def get_archived_urls_async(self, session: aiohttp.ClientSession) -> List[List[str]]:
        """Fetch archived URLs from Wayback Machine CDX API asynchronously."""
        extensions = [ext.lstrip('.') for ext in self.sensitive_extensions]
        extension_pattern = '|'.join(extensions)
        
        params = {
            'url': self.domain,
            'matchType': 'domain',
            'output': 'json',
            'fl': 'original,timestamp,statuscode,mimetype',
            'filter': f'urlkey:.*\\.({extension_pattern})$',
            'collapse': 'urlkey',
            'limit': 10000
        }
        
        try:
            print(f"Searching for sensitive files...")
            async with session.get(self.cdx_api_url, params=params) as response:
                response.raise_for_status()
                data = await response.json()
                
                if len(data) > 1:
                    results = data[1:]
                    print(f"\nFound {len(results)} files with sensitive extensions")
                    
                    # Group and display results by extension
                    files_by_ext = {}
                    for result in results:
                        url = result[0]
                        ext = os.path.splitext(urlparse(url).path)[1].lower()
                        if ext not in files_by_ext:
                            files_by_ext[ext] = []
                        files_by_ext[ext].append(url)
                    
                    for ext, urls in sorted(files_by_ext.items()):
                        print(f"\n{ext} files ({len(urls)}):")
                        for url in urls:
                            print(f"  {url}")
                    
                    return results
                return []
                
        except aiohttp.ClientError as e:
            print(f"Error fetching archived URLs: {e}")
            return []

    def scan_content(self, url_data: List[str]) -> Dict[str, Any]:
        """Scan a single URL's content for sensitive information."""
        url, timestamp, status_code, mime_type = url_data
        result = {
            'url': url,
            'timestamp': timestamp,
            'status_code': status_code,
            'mime_type': mime_type,
            'wayback_url': f"http://web.archive.org/web/{timestamp}/{url}",
            'api_keys': [],
            'passwords': [],
            'sensitive_files': []
        }

        # Record the sensitive file
        ext = os.path.splitext(urlparse(url).path)[1].lower()
        if ext in self.sensitive_extensions:
            result['sensitive_files'].append({
                'type': 'file_extension',
                'details': urlparse(url).path,
                'extension': ext
            })
            print(f"Found sensitive file: {url}")

        try:
            wayback_url = f"http://web.archive.org/web/{timestamp}/{url}"
            print(f"\nScanning: {wayback_url}")
            
            response = requests.get(wayback_url, timeout=30)
            response.raise_for_status()
            
            # Try to get the content, handling both text and binary files
            try:
                content = response.text
                
                # Scan for API keys
                for key_type, pattern in self.api_key_patterns.items():
                    matches = re.finditer(pattern, content)
                    for match in matches:
                        found_key = match.group()
                        context = content[max(0, match.start()-50):min(len(content), match.end()+50)]
                        print(f"Found {key_type}: {found_key}")
                        print(f"Context: {context}\n")
                        result['api_keys'].append({
                            'type': key_type,
                            'key': found_key,
                            'context': context
                        })

                # Scan for passwords
                for pass_type, pattern in self.password_patterns.items():
                    matches = re.finditer(pattern, content)
                    for match in matches:
                        found_pass = match.group()
                        context = content[max(0, match.start()-50):min(len(content), match.end()+50)]
                        print(f"Found {pass_type}: {found_pass}")
                        print(f"Context: {context}\n")
                        result['passwords'].append({
                            'type': pass_type,
                            'value': found_pass,
                            'context': context
                        })

            except UnicodeDecodeError:
                print(f"Binary file detected: {url}")
                # For binary files, just record their presence
                pass

        except requests.RequestException as e:
            print(f"Error scanning {url}: {e}")
        except Exception as e:
            print(f"Unexpected error scanning {url}: {e}")
        
        return result

    async def scan_content_async(self, session: aiohttp.ClientSession, url_data: List[str]) -> Dict[str, Any]:
        """Scan a single URL's content for sensitive information asynchronously."""
        url, timestamp, status_code, mime_type = url_data
        result = {
            'url': url,
            'timestamp': timestamp,
            'status_code': status_code,
            'mime_type': mime_type,
            'wayback_url': f"http://web.archive.org/web/{timestamp}/{url}",
            'api_keys': [],
            'passwords': [],
            'sensitive_files': []
        }

        # Record the sensitive file
        ext = os.path.splitext(urlparse(url).path)[1].lower()
        if ext in self.sensitive_extensions:
            result['sensitive_files'].append({
                'type': 'file_extension',
                'details': urlparse(url).path,
                'extension': ext
            })
            print(f"Found sensitive file: {url}")

        try:
            wayback_url = f"http://web.archive.org/web/{timestamp}/{url}"
            print(f"\nScanning: {wayback_url}")
            
            async with session.get(wayback_url, timeout=30) as response:
                response.raise_for_status()
                
                try:
                    content = await response.text()
                    
                    # Scan for API keys
                    for key_type, pattern in self.api_key_patterns.items():
                        matches = re.finditer(pattern, content)
                        for match in matches:
                            found_key = match.group()
                            context = content[max(0, match.start()-50):min(len(content), match.end()+50)]
                            print(f"Found {key_type}: {found_key}")
                            print(f"Context: {context}\n")
                            result['api_keys'].append({
                                'type': key_type,
                                'key': found_key,
                                'context': context
                            })

                    # Scan for passwords
                    for pass_type, pattern in self.password_patterns.items():
                        matches = re.finditer(pattern, content)
                        for match in matches:
                            found_pass = match.group()
                            context = content[max(0, match.start()-50):min(len(content), match.end()+50)]
                            print(f"Found {pass_type}: {found_pass}")
                            print(f"Context: {context}\n")
                            result['passwords'].append({
                                'type': pass_type,
                                'value': found_pass,
                                'context': context
                            })

                except UnicodeDecodeError:
                    print(f"Binary file detected: {url}")
                    # For binary files, just record their presence
                    pass

        except aiohttp.ClientError as e:
            print(f"Error scanning {url}: {e}")
        except Exception as e:
            print(f"Unexpected error scanning {url}: {e}")
        
        return result

    def generate_report(self, results: List[Dict[str, Any]]) -> str:
        """Generate a JSON report of findings."""
        report = {
            'domain': self.domain,
            'scan_date': datetime.now().isoformat(),
            'total_urls_scanned': len(results),
            'findings': results
        }
        
        # Ensure the directory exists
        os.makedirs(os.path.dirname(self.output_file), exist_ok=True)
        
        with open(self.output_file, 'w') as f:
            json.dump(report, f, indent=2)
        
        return self.output_file

    async def scan_async(self) -> List[Dict[str, Any]]:
        """Main scanning function using async/await."""
        print(f"Starting security scan for domain: {self.domain}")
        
        async with aiohttp.ClientSession() as session:
            archived_urls = await self.get_archived_urls_async(session)
            
            if not archived_urls:
                print("No archived URLs found for the domain.")
                return []

            print(f"Found {len(archived_urls)} archived URLs. Starting scan...")
            results = []
            
            tasks = []
            for url_data in archived_urls:
                task = asyncio.create_task(self.scan_content_async(session, url_data))
                tasks.append(task)
            
            for completed_task in asyncio.as_completed(tasks):
                try:
                    result = await completed_task
                    if result['api_keys'] or result['passwords'] or result['sensitive_files']:
                        results.append(result)
                except Exception as e:
                    print(f"Error processing task: {e}")

            report_file = self.generate_report(results)
            self.print_summary(results, len(archived_urls))
            
            return results

    def scan(self) -> List[Dict[str, Any]]:
        """Main scanning function using synchronous requests."""
        print(f"Starting security scan for domain: {self.domain}")
        archived_urls = self.get_archived_urls()
        
        if not archived_urls:
            print("No archived URLs found for the domain.")
            return []

        print(f"Found {len(archived_urls)} archived URLs. Starting scan...")
        results = []

        for url_data in archived_urls:
            try:
                result = self.scan_content(url_data)
                if result['api_keys'] or result['passwords'] or result['sensitive_files']:
                    results.append(result)
            except Exception as e:
                print(f"Error processing {url_data[0]}: {e}")

        report_file = self.generate_report(results)
        self.print_summary(results, len(archived_urls))
        
        return results

    def print_summary(self, results: List[Dict[str, Any]], total_urls: int):
        """Print a summary of the scan results."""
        print(f"\nScan complete! Report saved to: {self.output_file}")
        
        # Group findings by type
        files_by_ext = {}
        api_keys_by_type = {}
        passwords_by_type = {}
        
        for r in results:
            # Group sensitive files
            for file in r['sensitive_files']:
                ext = file.get('extension', 'unknown')
                if ext not in files_by_ext:
                    files_by_ext[ext] = []
                files_by_ext[ext].append(r['url'])
            
            # Group API keys
            for key in r['api_keys']:
                key_type = key['type']
                if key_type not in api_keys_by_type:
                    api_keys_by_type[key_type] = []
                api_keys_by_type[key_type].append(r['url'])
            
            # Group passwords
            for pwd in r['passwords']:
                pwd_type = pwd['type']
                if pwd_type not in passwords_by_type:
                    passwords_by_type[pwd_type] = []
                passwords_by_type[pwd_type].append(r['url'])
        
        print("\nDetailed Summary:")
        print("================")
        
        print("\nSensitive Files Found:")
        for ext, urls in sorted(files_by_ext.items()):
            print(f"\n{ext} files ({len(urls)}):")
            for url in urls:
                print(f"  - {url}")
        
        print("\nAPI Keys Found:")
        for key_type, urls in sorted(api_keys_by_type.items()):
            print(f"\n{key_type} ({len(urls)}):")
            for url in urls:
                print(f"  - {url}")
        
        print("\nPasswords/Credentials Found:")
        for pwd_type, urls in sorted(passwords_by_type.items()):
            print(f"\n{pwd_type} ({len(urls)}):")
            for url in urls:
                print(f"  - {url}")
        
        print("\nStatistics:")
        print(f"- Total URLs scanned: {total_urls}")
        print(f"- Sensitive files found: {sum(len(urls) for urls in files_by_ext.values())}")
        print(f"- API keys found: {sum(len(urls) for urls in api_keys_by_type.values())}")
        print(f"- Passwords found: {sum(len(urls) for urls in passwords_by_type.values())}")

def main():
    parser = argparse.ArgumentParser(description='Scan Wayback Machine archives for sensitive information.')
    parser.add_argument('domain', nargs='?', help='Domain to scan')
    parser.add_argument('--threads', type=int, default=5, help='Number of concurrent threads (default: 5)')
    parser.add_argument('--output', help='Output file path')
    parser.add_argument('--api-only', action='store_true', help='Only scan for API keys')
    parser.add_argument('--no-files', action='store_true', help='Skip file extension scanning')
    parser.add_argument('--rate-limit', type=float, default=1.0, help='Rate limit in seconds between requests (default: 1.0)')
    parser.add_argument('--use-async', action='store_true', help='Use async/await for better performance')
    
    args = parser.parse_args()
    
    # If no domain is provided, ask for it interactively
    if not args.domain:
        print("\nWelcome to the Wayback Machine Security Scanner!")
        print("This tool will scan archived versions of a website for sensitive information.")
        print("Please enter the domain you want to scan (e.g., example.com):")
        args.domain = input("> ").strip()
        
        if not args.domain:
            print("Error: Domain is required.")
            return
    
    scanner = WaybackSecurityScanner(
        domain=args.domain,
        max_workers=args.threads,
        output_file=args.output,
        api_only=args.api_only,
        scan_files=not args.no_files,
        rate_limit=args.rate_limit
    )
    
    if args.use_async:
        asyncio.run(scanner.scan_async())
    else:
        scanner.scan()

if __name__ == "__main__":
    main()

Usage

Basic Usage

python wayback_security_scanner.py example.com

Advanced Options

# Async mode with custom threads and rate limit
python wayback_security_scanner.py example.com --async --threads 10 --rate-limit 0.5

# API-only scan with custom output
python wayback_security_scanner.py example.com --api-only --output results.json

# Skip file scanning
python wayback_security_scanner.py example.com --no-files

Features

1. Comprehensive File Detection

  • Configuration files (.env, .config, .properties, etc.)
  • Source code files (.js, .py, .php, etc.)
  • Executables and binaries (.exe, .dll, .so, etc.)
  • Database files (.sql, .db, etc.)
  • Log files (.log, .txt)
  • Archive files (.zip, .tar, etc.)
  • Certificate files (.pem, .key, etc.)
  • Shell scripts (.sh, .bat, etc.)

2. API Key Detection

  • Google API keys
  • OpenAI API keys
  • AWS access keys
  • Firebase API keys
  • GitHub tokens
  • Slack tokens
  • Stripe keys
  • Twilio keys
  • Mailgun keys
  • Generic API keys

3. Password Pattern Detection

  • Basic authentication credentials
  • Database connection strings
  • JWT tokens

4. Performance Features

  • Multi-threaded scanning
  • Async/await support
  • Rate limiting
  • Retry logic with exponential backoff

5. Reporting

  • Detailed JSON reports
  • Context around found credentials
  • Summary statistics
  • Organized findings by type

Requirements

pip install -r requirements.txt

Requirements file:

requests==2.31.0
typing-extensions==4.8.0
tenacity==8.2.3
aiohttp==3.9.1
argparse==1.4.0

All rights reserved.