Subdomain Enumeration Tool [Python]

This script performs **subdomain enumeration** for a given domain by using multiple OSINT (Open Source Intelligence) sources. It also attempts to resolve found subdomains using DNS and saves the results.

👨‍💻 Coded with curiosity by cybercur@

🔍 Problem [Domain Hijacking]

This is a tactic commonly observed in the e-commerce sector, where a threat actor gains unauthorized access to a victim's DNS control panel or hosting provider. With this access, the attacker can modify DNS records and create malicious subdomains to support further phases of their campaign.

💀 Why Would Attackers Create Subdomains?

PurposeExample
Phishinglogin.example.com mimicking a real service
C2 communication (Command & Control)c2.example.com to manage malware
Credential harvestingsecure.example.com as a fake login portal
Spam or malware deliverydownload.example.com to serve malicious files
PersistenceCreate DNS entries that aren’t easily noticed
Abuse trust of root domainLeverage trust in example.com for deception

🧾 Subdomain Enumeration Tool

My primary use case lies in threat hunting. I developed this script to aid in identifying not only the breadth of infrastructure associated with a campaign but also to surface behavioral patterns and linguistic clues—such as recurring terms or region-specific language usage—across subdomains.

The goal is to gain a more holistic view of the ecosystem behind an attack campaign, uncovering how threat actors structure and reuse their infrastructure when targeting our customers. If you do not have the ability to dev custom tools for your team, a powerful alternative for campaign tracking and infrastructure correlation is URLscan PRO, which offers rich contextual insights at scale.

Source Code

# Library imports for handling CLI args, networking, parsing, concurrency, and file operations
import argparse
import concurrent.futures
import dns.resolver
import requests
import re
import time
import random
import csv
import os
from bs4 import BeautifulSoup
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry

# ANSI color codes
G = '\033[92m'  # green
Y = '\033[93m'  # yellow
B = '\033[94m'  # blue
R = '\033[91m'  # red
W = '\033[0m'   # white

# Queries crt.sh for subdomains listed in SSL certificate transparency logs
def search_crt_sh(domain, max_retries=5, backoff_factor=2):
    url = f"https://crt.sh/?q=%.{domain}&output=json"
    session = requests.Session()
    retry = Retry(total=max_retries,
                  backoff_factor=backoff_factor,
                  status_forcelist=[429, 500, 502, 503, 504])
    adapter = HTTPAdapter(max_retries=retry)
    session.mount("https://", adapter)

    try:
        response = session.get(url, timeout=30)
        if response.status_code == 200:
            data = response.json()
            return set(item['name_value'] for item in data)
        else:
            print(f"{R}[!] crt.sh returned status code {response.status_code}{W}")
    except requests.exceptions.RequestException as e:
        print(f"{R}[!] Error in crt.sh search: {str(e)}{W}")

    return set()

def search_virustotal(domain, api_key):
    url = f"https://www.virustotal.com/api/v3/domains/{domain}/subdomains"
    headers = {"x-apikey": api_key}
    try:
        response = requests.get(url, headers=headers, timeout=10)
        if response.status_code == 200:
            data = response.json()
            return set(item['id'] for item in data['data'])
    except Exception as e:
        print(f"{R}[!] Error in VirusTotal search: {str(e)}{W}")
    return set()

def search_certspotter(domain):
    url = f"https://api.certspotter.com/v1/issuances?domain={domain}&include_subdomains=true&expand=dns_names"
    try:
        response = requests.get(url, timeout=10)
        if response.status_code == 200:
            data = response.json()
            subdomains = set()
            for cert in data:
                subdomains.update(cert.get('dns_names', []))
            return subdomains
    except Exception as e:
        print(f"{R}[!] Error in Certspotter search: {str(e)}{W}")
    return set()

def search_alienvault(domain, api_key):
    url = f"https://otx.alienvault.com/api/v1/indicators/domain/{domain}/passive_dns"
    headers = {"X-OTX-API-KEY": api_key}
    try:
        response = requests.get(url, headers=headers, timeout=10)
        if response.status_code == 200:
            data = response.json()
            return set(item['hostname'] for item in data['passive_dns'])
    except Exception as e:
        print(f"{R}[!] Error in AlienVault OTX search: {str(e)}{W}")
    return set()

def search_dnsdumpster(domain):
    url = "https://dnsdumpster.com/"
    headers = {"Referer": "https://dnsdumpster.com"}
    session = requests.Session()
    try:
        response = session.get(url, timeout=10)
        csrf_token = response.cookies.get('csrftoken')
        data = {
            "csrfmiddlewaretoken": csrf_token,
            "targetip": domain,
        }
        response = session.post(url, data=data, headers=headers, timeout=10)
        if response.status_code == 200:
            soup = BeautifulSoup(response.text, 'html.parser')
            return set(re.findall(r'([\w.-]+\.{})\b'.format(re.escape(domain)), soup.text))
    except Exception as e:
        print(f"{R}[!] Error in DNSDumpster search: {str(e)}{W}")
    return set()

def search_engine(domain, engine):
    headers = {'User-Agent': 'Mozilla/5.0'}
    if engine == "google":
        url = f"https://www.google.com/search?q=site%3A{domain}&num=100"
    elif engine == "bing":
        url = f"https://www.bing.com/search?q=site%3A{domain}&count=100"
    elif engine == "yahoo":
        url = f"https://search.yahoo.com/search?p=site%3A{domain}&n=100"
    elif engine == "baidu":
        url = f"https://www.baidu.com/s?wd=site%3A{domain}&rn=100"
    elif engine == "ask":
        url = f"https://www.ask.com/web?q=site%3A{domain}&qo=100"
    else:
        return set()

    try:
        response = requests.get(url, headers=headers, timeout=10)
        if response.status_code == 200:
            soup = BeautifulSoup(response.text, 'html.parser')
            return set(re.findall(r'([\w.-]+\.{})\b'.format(re.escape(domain)), soup.text))
    except Exception as e:
        print(f"{R}[!] Error in {engine} search: {str(e)}{W}")
    return set()

# Uses DNS to check if the given subdomain resolves to an IP address
def resolve_subdomain(subdomain):
    try:
        dns.resolver.resolve(subdomain, 'A')
        return (subdomain, True)
    except:
        return (subdomain, False)

# Orchestrates the enumeration process by querying multiple OSINT sources and resolving discovered subdomains
def enumerate_subdomains(domain, virustotal_api_key, alienvault_api_key, verbose=False):
    subdomains = set()
    search_functions = [
        ("Crt.sh", lambda: search_crt_sh(domain)),
        ("Certspotter", lambda: search_certspotter(domain)),
        ("AlienVault OTX", lambda: search_alienvault(domain, alienvault_api_key)),
        ("DNSDumpster", lambda: search_dnsdumpster(domain)),
        ("Google", lambda: search_engine(domain, "google")),
        ("Bing", lambda: search_engine(domain, "bing")),
        ("Yahoo", lambda: search_engine(domain, "yahoo")),
        ("Baidu", lambda: search_engine(domain, "baidu")),
        ("Ask", lambda: search_engine(domain, "ask")),
        ("VirusTotal", lambda: search_virustotal(domain, virustotal_api_key)),
    ]

    with concurrent.futures.ThreadPoolExecutor(max_workers=len(search_functions)) as executor:
        future_to_search = {executor.submit(search_func): (name, search_func) for name, search_func in search_functions}
        for future in concurrent.futures.as_completed(future_to_search):
            name, _ = future_to_search[future]
            result = future.result()
            subdomains.update(result)
            if verbose:
                print(f"{G}[+] {name}: Found {len(result)} subdomains{W}")
            time.sleep(random.uniform(1, 3))

    common_subdomains = ["www", "mail", "ftp", "webmail", "smtp", "pop", "ns1", "webdisk", "ns2", "cpanel", "whm", "autodiscover", "autoconfig", "m", "imap", "test", "ns", "blog", "pop3", "dev", "www2", "admin", "forum", "news", "vpn", "ns3", "mail2", "new", "mysql", "old", "lists", "support", "mobile", "mx", "static", "docs", "beta", "shop", "sql", "secure", "demo", "cp", "calendar", "wiki", "web", "media", "email", "images", "img", "www1", "intranet", "portal", "video", "sip", "dns2", "api", "cdn", "stats", "dns1", "ns4", "www3", "dns", "search", "staging", "server", "mx1", "chat", "wap", "my", "svn", "mail1", "sites", "proxy", "ads", "host", "crm", "cms", "backup", "mx2", "lyncdiscover", "info", "apps", "download", "remote", "db", "forums", "store", "relay", "files", "newsletter", "app", "live", "owa", "en", "start", "sms", "office", "exchange", "ipv4"]
    subdomains.update({f"{sub}.{domain}" for sub in common_subdomains})

    if verbose:
        print(f"{Y}[*] Resolving discovered subdomains...{W}")

    resolved_subdomains = []
    unresolved_subdomains = []
    with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
        results = list(executor.map(resolve_subdomain, subdomains))
        for subdomain, resolves in results:
            if resolves:
                resolved_subdomains.append(subdomain)
            else:
                unresolved_subdomains.append(subdomain)

    if verbose:
        print(f"{G}[+] Resolved {len(resolved_subdomains)} subdomains{W}")
        print(f"{Y}[+] Found {len(unresolved_subdomains)} unresolved subdomains{W}")

    return resolved_subdomains, unresolved_subdomains

def save_results_as_csv(resolved_subdomains, unresolved_subdomains, domain):
    desktop = os.path.join(os.path.expanduser("~"), "Desktop")
    filename = f"{domain}_subdomains.csv"
    filepath = os.path.join(desktop, filename)

    with open(filepath, 'w', newline='') as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(["Subdomain", "Status"])
        for subdomain in resolved_subdomains:
            writer.writerow([subdomain, "Resolved"])
        for subdomain in unresolved_subdomains:
            writer.writerow([subdomain, "Unresolved"])

    print(f"{Y}[*] Results saved as CSV: {filepath}{W}")

# Entry point of the script where command-line arguments are parsed and subdomain enumeration is initiated
def main():
    parser = argparse.ArgumentParser(description="Subdomain enumeration tool")
    parser.add_argument("-d", "--domain", required=True, help="Target domain")
    parser.add_argument("-o", "--output", help="Output file to save results")
    parser.add_argument("-v", "--verbose", action="store_true", help="Enable verbose output")
    parser.add_argument("--vt-api-key", default="", help="VirusTotal API key")
    parser.add_argument("--av-api-key", default="", help="AlienVault API key")
    args = parser.parse_args()

    print(f"{B}[*] Enumerating subdomains for {args.domain}{W}")
    resolved_subdomains, unresolved_subdomains = enumerate_subdomains(args.domain, args.vt_api_key, args.av_api_key, args.verbose)

    print(f"\n{G}[+] Found {len(resolved_subdomains)} resolved subdomains:{W}")
    for subdomain in sorted(resolved_subdomains):
        print(f"{G}{subdomain}{W}")

    print(f"\n{Y}[+] Found {len(unresolved_subdomains)} unresolved subdomains:{W}")
    for subdomain in sorted(unresolved_subdomains):
        print(f"{Y}{subdomain}{W}")

    if args.output:
        with open(args.output, 'w') as f:
            f.write("Resolved subdomains:\n")
            for subdomain in sorted(resolved_subdomains):
                f.write(f"{subdomain}\n")
            f.write("\nUnresolved subdomains:\n")
            for subdomain in sorted(unresolved_subdomains):
                f.write(f"{subdomain}\n")
        print(f"\n{Y}[*] Results saved to {args.output}{W}")

    save_results_as_csv(resolved_subdomains, unresolved_subdomains, args.domain)

if __name__ == "__main__":
    main()

🧱 Key Components

📦 Imports

Handles:

  • Command-line arguments (argparse)
  • Concurrency (concurrent.futures)
  • HTTP requests (requests)
  • DNS resolution (dns.resolver)
  • Regex matching (re)
  • HTML parsing (BeautifulSoup)
  • File operations and CSV output (os, csv)

🎨 ANSI Color Codes

Defines terminal color codes for:

  • ✅ Green (G)
  • ⚠️ Yellow (Y)
  • 🔵 Blue (B)
  • ❌ Red (R)
  • 🧾 Reset (W)

🔎 Subdomain Data Sources

Each of these functions queries a public source for subdomains:

FunctionSourceDescription
search_crt_shcrt.shSSL certificates
search_virustotalVirusTotal APIPassive DNS
search_certspotterCertSpotter APISSL logs
search_alienvaultAlienVault OTX APIDNS records
search_dnsdumpsterDNSDumpsterDNS info (scraped)
search_engineGoogle, Bing, etc.Indexed subdomains

🌐 DNS Resolution

  • resolve_subdomain(subdomain)
    Attempts to resolve the given subdomain using a DNS A-record lookup.

🤖 Enumeration Logic

  • enumerate_subdomains(...)
    • Executes all search functions in parallel
    • Adds a predefined list of common subdomains (e.g., www, mail, ftp)
    • Resolves all discovered subdomains
    • Separates them into resolved and unresolved sets
    • Supports a verbose flag for live output

💾 Output Functions

  • save_results_as_csv(...)
    Writes subdomains with status (Resolved / Unresolved) to a .csv file on the user’s Desktop.

🧠 Main Function

  • main()
    • Parses CLI arguments like:
      • -d or --domain: Target domain
      • -v or --verbose: Show detailed output
      • --vt-api-key: VirusTotal API key
      • --av-api-key: AlienVault API key
    • Calls the enumeration and prints results
    • Optionally saves results to a file if -o is provided

🛠️ Example Usage

python subenum.py -d example.com -v --vt-api-key YOUR_KEY --av-api-key YOUR_KEY

All rights reserved.