Threat Hunting on Wayback Machine [Python]
A Python tool that scans Wayback Machine archives for sensitive information, API keys, and potentially dangerous files. Features include retry logic, rate limiting, and async support.
๐จโ๐ป Coded with curiosity by
cybercur@
Remember to use this tool responsibly and only scan domains you have permission to analyze. The Wayback Machine is a valuable resource, and we should respect its terms of service and rate limits.
๐ Problem [Information Disclosure]
Wayback Machine archives billions of web pages, including potentially sensitive information that should have been removed. This can include:
- API keys and credentials
- Configuration files
- Source code
- Executable files
- Database dumps
- Log files
๐ Why Is This Dangerous?
Risk | Example |
---|---|
Credential Exposure | API keys in .env files |
Source Code Leaks | Unintentionally published .py or .js files |
Malware Distribution | Archived .exe or .dll files |
Configuration Disclosure | Database credentials in config files |
Sensitive Data Exposure | Log files containing user data |
๐งพ Wayback Machine Security Scanner
This tool is intended to identify sensitive information that might have been accidentally archived in the Wayback Machine. The scanner uses the Wayback Machine's CDX API to find archived URLs and then analyzes their content for potential security risks.
Key Features
- Comprehensive File Detection: Scans for various sensitive file types
- API Key Detection: Identifies multiple types of API keys and credentials
- Retry Logic: Handles failed requests with exponential backoff
- Rate Limiting: Respects Wayback Machine's rate limits
- Async Support: Fast parallel processing with asyncio
- Detailed Reporting: Generates JSON reports with findings
Source Code
#!/usr/bin/env python3
import asyncio
import aiohttp
import requests
import re
import json
import argparse
from urllib.parse import urlparse
from datetime import datetime
import time
import os
from typing import List, Dict, Any, Optional
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
class WaybackSecurityScanner:
def __init__(
self,
domain: str,
max_workers: int = 5,
output_file: Optional[str] = None,
api_only: bool = False,
scan_files: bool = True,
rate_limit: float = 1.0
):
self.domain = domain
self.cdx_api_url = "http://web.archive.org/cdx/search/cdx"
self.max_workers = max_workers
self.api_only = api_only
self.scan_files = scan_files
self.rate_limit = rate_limit
self.last_request_time = 0
# Set output file to desktop if not specified
if output_file:
self.output_file = output_file
else:
desktop = os.path.join(os.path.expanduser("~"), "OneDrive", "Desktop")
if not os.path.exists(desktop):
desktop = os.path.join(os.path.expanduser("~"), "Desktop")
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
self.output_file = os.path.join(desktop, f"wayback_security_report_{timestamp}.json")
# Sensitive file extensions to look for
self.sensitive_extensions = [
# Configuration files
'.env', '.config', '.properties', '.ini', '.conf', '.cfg',
# Source code files
'.js', '.py', '.php', '.java', '.rb', '.go', '.cs',
# Executables and binaries
'.exe', '.dll', '.so', '.dylib', '.bin',
# Database files
'.sql', '.db', '.sqlite', '.mdb',
# Log files
'.log', '.txt',
# Archive files
'.zip', '.tar', '.gz', '.rar',
# Certificate files
'.pem', '.key', '.crt', '.cer',
# Shell scripts
'.sh', '.bat', '.ps1', '.cmd'
]
# API key patterns to look for
self.api_key_patterns = {
'google_api': r'AIza[0-9A-Za-z-_]{35}',
'openai_api': r'sk-[0-9A-Za-z]{32,}',
'aws_access': r'AKIA[0-9A-Z]{16}',
'aws_secret': r'[0-9A-Za-z/+]{40}',
'firebase': r'[0-9a-f]{32}-us[0-9]{1,2}',
'github_token': r'ghp_[0-9A-Za-z]{36}',
'slack_token': r'xox[baprs]-[0-9A-Za-z-]{10,48}',
'stripe_key': r'sk_live_[0-9a-zA-Z]{24}',
'twilio_key': r'SK[0-9a-fA-F]{32}',
'mailgun_key': r'key-[0-9a-f]{32}',
'generic_api_key': r'[0-9A-Za-z]{32,}'
}
# Common password patterns
self.password_patterns = {
'basic_auth': r'[a-zA-Z0-9._%+-]+:[a-zA-Z0-9._%+-]+@',
'database_url': r'(postgres|mysql|mongodb)://[a-zA-Z0-9._%+-]+:[a-zA-Z0-9._%+-]+@',
'jwt_token': r'eyJ[a-zA-Z0-9-_]+\.eyJ[a-zA-Z0-9-_]+\.([a-zA-Z0-9-_]+)'
}
def _rate_limit(self):
"""Implement rate limiting between requests."""
current_time = time.time()
time_since_last = current_time - self.last_request_time
if time_since_last < self.rate_limit:
time.sleep(self.rate_limit - time_since_last)
self.last_request_time = time.time()
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10),
retry=retry_if_exception_type(requests.RequestException)
)
def get_archived_urls(self) -> List[List[str]]:
"""Fetch archived URLs from Wayback Machine CDX API."""
self._rate_limit()
# Create a list of all extensions we want to search for
extensions = [ext.lstrip('.') for ext in self.sensitive_extensions]
extension_pattern = '|'.join(extensions)
params = {
'url': self.domain,
'matchType': 'domain',
'output': 'json',
'fl': 'original,timestamp,statuscode,mimetype',
'filter': f'urlkey:.*\\.({extension_pattern})$',
'collapse': 'urlkey',
'limit': 10000
}
all_results = []
try:
print(f"Searching for sensitive files...")
response = requests.get(self.cdx_api_url, params=params)
response.raise_for_status()
data = response.json()
if len(data) > 1:
results = data[1:] # Skip header row
all_results.extend(results)
print(f"\nFound {len(results)} files with sensitive extensions")
# Group and display results by extension
files_by_ext = {}
for result in results:
url = result[0]
ext = os.path.splitext(urlparse(url).path)[1].lower()
if ext not in files_by_ext:
files_by_ext[ext] = []
files_by_ext[ext].append(url)
for ext, urls in sorted(files_by_ext.items()):
print(f"\n{ext} files ({len(urls)}):")
for url in urls:
print(f" {url}")
return all_results
except requests.RequestException as e:
print(f"Error fetching archived URLs: {e}")
return []
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10),
retry=retry_if_exception_type((requests.RequestException, aiohttp.ClientError))
)
async def get_archived_urls_async(self, session: aiohttp.ClientSession) -> List[List[str]]:
"""Fetch archived URLs from Wayback Machine CDX API asynchronously."""
extensions = [ext.lstrip('.') for ext in self.sensitive_extensions]
extension_pattern = '|'.join(extensions)
params = {
'url': self.domain,
'matchType': 'domain',
'output': 'json',
'fl': 'original,timestamp,statuscode,mimetype',
'filter': f'urlkey:.*\\.({extension_pattern})$',
'collapse': 'urlkey',
'limit': 10000
}
try:
print(f"Searching for sensitive files...")
async with session.get(self.cdx_api_url, params=params) as response:
response.raise_for_status()
data = await response.json()
if len(data) > 1:
results = data[1:]
print(f"\nFound {len(results)} files with sensitive extensions")
# Group and display results by extension
files_by_ext = {}
for result in results:
url = result[0]
ext = os.path.splitext(urlparse(url).path)[1].lower()
if ext not in files_by_ext:
files_by_ext[ext] = []
files_by_ext[ext].append(url)
for ext, urls in sorted(files_by_ext.items()):
print(f"\n{ext} files ({len(urls)}):")
for url in urls:
print(f" {url}")
return results
return []
except aiohttp.ClientError as e:
print(f"Error fetching archived URLs: {e}")
return []
def scan_content(self, url_data: List[str]) -> Dict[str, Any]:
"""Scan a single URL's content for sensitive information."""
url, timestamp, status_code, mime_type = url_data
result = {
'url': url,
'timestamp': timestamp,
'status_code': status_code,
'mime_type': mime_type,
'wayback_url': f"http://web.archive.org/web/{timestamp}/{url}",
'api_keys': [],
'passwords': [],
'sensitive_files': []
}
# Record the sensitive file
ext = os.path.splitext(urlparse(url).path)[1].lower()
if ext in self.sensitive_extensions:
result['sensitive_files'].append({
'type': 'file_extension',
'details': urlparse(url).path,
'extension': ext
})
print(f"Found sensitive file: {url}")
try:
wayback_url = f"http://web.archive.org/web/{timestamp}/{url}"
print(f"\nScanning: {wayback_url}")
response = requests.get(wayback_url, timeout=30)
response.raise_for_status()
# Try to get the content, handling both text and binary files
try:
content = response.text
# Scan for API keys
for key_type, pattern in self.api_key_patterns.items():
matches = re.finditer(pattern, content)
for match in matches:
found_key = match.group()
context = content[max(0, match.start()-50):min(len(content), match.end()+50)]
print(f"Found {key_type}: {found_key}")
print(f"Context: {context}\n")
result['api_keys'].append({
'type': key_type,
'key': found_key,
'context': context
})
# Scan for passwords
for pass_type, pattern in self.password_patterns.items():
matches = re.finditer(pattern, content)
for match in matches:
found_pass = match.group()
context = content[max(0, match.start()-50):min(len(content), match.end()+50)]
print(f"Found {pass_type}: {found_pass}")
print(f"Context: {context}\n")
result['passwords'].append({
'type': pass_type,
'value': found_pass,
'context': context
})
except UnicodeDecodeError:
print(f"Binary file detected: {url}")
# For binary files, just record their presence
pass
except requests.RequestException as e:
print(f"Error scanning {url}: {e}")
except Exception as e:
print(f"Unexpected error scanning {url}: {e}")
return result
async def scan_content_async(self, session: aiohttp.ClientSession, url_data: List[str]) -> Dict[str, Any]:
"""Scan a single URL's content for sensitive information asynchronously."""
url, timestamp, status_code, mime_type = url_data
result = {
'url': url,
'timestamp': timestamp,
'status_code': status_code,
'mime_type': mime_type,
'wayback_url': f"http://web.archive.org/web/{timestamp}/{url}",
'api_keys': [],
'passwords': [],
'sensitive_files': []
}
# Record the sensitive file
ext = os.path.splitext(urlparse(url).path)[1].lower()
if ext in self.sensitive_extensions:
result['sensitive_files'].append({
'type': 'file_extension',
'details': urlparse(url).path,
'extension': ext
})
print(f"Found sensitive file: {url}")
try:
wayback_url = f"http://web.archive.org/web/{timestamp}/{url}"
print(f"\nScanning: {wayback_url}")
async with session.get(wayback_url, timeout=30) as response:
response.raise_for_status()
try:
content = await response.text()
# Scan for API keys
for key_type, pattern in self.api_key_patterns.items():
matches = re.finditer(pattern, content)
for match in matches:
found_key = match.group()
context = content[max(0, match.start()-50):min(len(content), match.end()+50)]
print(f"Found {key_type}: {found_key}")
print(f"Context: {context}\n")
result['api_keys'].append({
'type': key_type,
'key': found_key,
'context': context
})
# Scan for passwords
for pass_type, pattern in self.password_patterns.items():
matches = re.finditer(pattern, content)
for match in matches:
found_pass = match.group()
context = content[max(0, match.start()-50):min(len(content), match.end()+50)]
print(f"Found {pass_type}: {found_pass}")
print(f"Context: {context}\n")
result['passwords'].append({
'type': pass_type,
'value': found_pass,
'context': context
})
except UnicodeDecodeError:
print(f"Binary file detected: {url}")
# For binary files, just record their presence
pass
except aiohttp.ClientError as e:
print(f"Error scanning {url}: {e}")
except Exception as e:
print(f"Unexpected error scanning {url}: {e}")
return result
def generate_report(self, results: List[Dict[str, Any]]) -> str:
"""Generate a JSON report of findings."""
report = {
'domain': self.domain,
'scan_date': datetime.now().isoformat(),
'total_urls_scanned': len(results),
'findings': results
}
# Ensure the directory exists
os.makedirs(os.path.dirname(self.output_file), exist_ok=True)
with open(self.output_file, 'w') as f:
json.dump(report, f, indent=2)
return self.output_file
async def scan_async(self) -> List[Dict[str, Any]]:
"""Main scanning function using async/await."""
print(f"Starting security scan for domain: {self.domain}")
async with aiohttp.ClientSession() as session:
archived_urls = await self.get_archived_urls_async(session)
if not archived_urls:
print("No archived URLs found for the domain.")
return []
print(f"Found {len(archived_urls)} archived URLs. Starting scan...")
results = []
tasks = []
for url_data in archived_urls:
task = asyncio.create_task(self.scan_content_async(session, url_data))
tasks.append(task)
for completed_task in asyncio.as_completed(tasks):
try:
result = await completed_task
if result['api_keys'] or result['passwords'] or result['sensitive_files']:
results.append(result)
except Exception as e:
print(f"Error processing task: {e}")
report_file = self.generate_report(results)
self.print_summary(results, len(archived_urls))
return results
def scan(self) -> List[Dict[str, Any]]:
"""Main scanning function using synchronous requests."""
print(f"Starting security scan for domain: {self.domain}")
archived_urls = self.get_archived_urls()
if not archived_urls:
print("No archived URLs found for the domain.")
return []
print(f"Found {len(archived_urls)} archived URLs. Starting scan...")
results = []
for url_data in archived_urls:
try:
result = self.scan_content(url_data)
if result['api_keys'] or result['passwords'] or result['sensitive_files']:
results.append(result)
except Exception as e:
print(f"Error processing {url_data[0]}: {e}")
report_file = self.generate_report(results)
self.print_summary(results, len(archived_urls))
return results
def print_summary(self, results: List[Dict[str, Any]], total_urls: int):
"""Print a summary of the scan results."""
print(f"\nScan complete! Report saved to: {self.output_file}")
# Group findings by type
files_by_ext = {}
api_keys_by_type = {}
passwords_by_type = {}
for r in results:
# Group sensitive files
for file in r['sensitive_files']:
ext = file.get('extension', 'unknown')
if ext not in files_by_ext:
files_by_ext[ext] = []
files_by_ext[ext].append(r['url'])
# Group API keys
for key in r['api_keys']:
key_type = key['type']
if key_type not in api_keys_by_type:
api_keys_by_type[key_type] = []
api_keys_by_type[key_type].append(r['url'])
# Group passwords
for pwd in r['passwords']:
pwd_type = pwd['type']
if pwd_type not in passwords_by_type:
passwords_by_type[pwd_type] = []
passwords_by_type[pwd_type].append(r['url'])
print("\nDetailed Summary:")
print("================")
print("\nSensitive Files Found:")
for ext, urls in sorted(files_by_ext.items()):
print(f"\n{ext} files ({len(urls)}):")
for url in urls:
print(f" - {url}")
print("\nAPI Keys Found:")
for key_type, urls in sorted(api_keys_by_type.items()):
print(f"\n{key_type} ({len(urls)}):")
for url in urls:
print(f" - {url}")
print("\nPasswords/Credentials Found:")
for pwd_type, urls in sorted(passwords_by_type.items()):
print(f"\n{pwd_type} ({len(urls)}):")
for url in urls:
print(f" - {url}")
print("\nStatistics:")
print(f"- Total URLs scanned: {total_urls}")
print(f"- Sensitive files found: {sum(len(urls) for urls in files_by_ext.values())}")
print(f"- API keys found: {sum(len(urls) for urls in api_keys_by_type.values())}")
print(f"- Passwords found: {sum(len(urls) for urls in passwords_by_type.values())}")
def main():
parser = argparse.ArgumentParser(description='Scan Wayback Machine archives for sensitive information.')
parser.add_argument('domain', nargs='?', help='Domain to scan')
parser.add_argument('--threads', type=int, default=5, help='Number of concurrent threads (default: 5)')
parser.add_argument('--output', help='Output file path')
parser.add_argument('--api-only', action='store_true', help='Only scan for API keys')
parser.add_argument('--no-files', action='store_true', help='Skip file extension scanning')
parser.add_argument('--rate-limit', type=float, default=1.0, help='Rate limit in seconds between requests (default: 1.0)')
parser.add_argument('--use-async', action='store_true', help='Use async/await for better performance')
args = parser.parse_args()
# If no domain is provided, ask for it interactively
if not args.domain:
print("\nWelcome to the Wayback Machine Security Scanner!")
print("This tool will scan archived versions of a website for sensitive information.")
print("Please enter the domain you want to scan (e.g., example.com):")
args.domain = input("> ").strip()
if not args.domain:
print("Error: Domain is required.")
return
scanner = WaybackSecurityScanner(
domain=args.domain,
max_workers=args.threads,
output_file=args.output,
api_only=args.api_only,
scan_files=not args.no_files,
rate_limit=args.rate_limit
)
if args.use_async:
asyncio.run(scanner.scan_async())
else:
scanner.scan()
if __name__ == "__main__":
main()
Usage
Basic Usage
python wayback_security_scanner.py example.com
Advanced Options
# Async mode with custom threads and rate limit
python wayback_security_scanner.py example.com --async --threads 10 --rate-limit 0.5
# API-only scan with custom output
python wayback_security_scanner.py example.com --api-only --output results.json
# Skip file scanning
python wayback_security_scanner.py example.com --no-files
Features
1. Comprehensive File Detection
- Configuration files (
.env
,.config
,.properties
, etc.) - Source code files (
.js
,.py
,.php
, etc.) - Executables and binaries (
.exe
,.dll
,.so
, etc.) - Database files (
.sql
,.db
, etc.) - Log files (
.log
,.txt
) - Archive files (
.zip
,.tar
, etc.) - Certificate files (
.pem
,.key
, etc.) - Shell scripts (
.sh
,.bat
, etc.)
2. API Key Detection
- Google API keys
- OpenAI API keys
- AWS access keys
- Firebase API keys
- GitHub tokens
- Slack tokens
- Stripe keys
- Twilio keys
- Mailgun keys
- Generic API keys
3. Password Pattern Detection
- Basic authentication credentials
- Database connection strings
- JWT tokens
4. Performance Features
- Multi-threaded scanning
- Async/await support
- Rate limiting
- Retry logic with exponential backoff
5. Reporting
- Detailed JSON reports
- Context around found credentials
- Summary statistics
- Organized findings by type
Requirements
pip install -r requirements.txt
Requirements file:
requests==2.31.0
typing-extensions==4.8.0
tenacity==8.2.3
aiohttp==3.9.1
argparse==1.4.0