""" tesla_security_scan.py - Enhanced Security Scanner for Tesla Subdomains using Shodan API Features: - Read subdomains from .txt file or command line - Query Shodan API for each host with detailed info (ports, services, versions) - Detect CVEs using OS and service banner intelligence - Generate structured JSON report with vulnerability analysis - Provide executive summary review """ import sys import json import argparse import shodan import re from datetime import datetime from collections import defaultdict def load_subdomains_from_file(file_path): """Load subdomains from a .txt file.""" with open(file_path, 'r', encoding='utf-8') as f: domains = [line.strip() for line in f if line.strip()] return domains def search_host_shodan(ip_address, api_key): """Get detailed host information from Shodan API.""" try: api = shodan.Shodan(api_key) results = api.host(ip_address) return { 'ip': ip_address, 'ports': [], 'services': {}, 'os': results.get('os', None), 'org': results.get('org', 'Unknown'), 'last_update': datetime.now().isoformat(), 'banners': [], 'vulnerabilities': [] } except shodan.APIError as e: return { 'ip': ip_address, 'error': str(e), 'ports': [], 'services': {}, 'os': None, 'org': 'Unknown', 'last_update': datetime.now().isoformat(), 'banners': [] } def extract_service_info(shodan_data): """Extract and organize service information from Shodan data.""" services = {} if not shodan_data or 'ports' not in shodan_data: return services for port_entry in shodan_data.get('ports', []): port_num = port_entry.get('port') banner = port_entry.get('banner', '') service_info = { 'port': port_num, 'protocol': port_entry.get('transport', 'tcp'), 'service': port_entry.get('name', 'unknown'), 'version': port_entry.get('product') or port_entry.get('version', 'unknown'), 'banners': [] } if banner: try: banner_lines = banner.split('\n') service_info['banners'].append({ 'raw': '\n'.join(banner_lines[:10]), 'hostname': port_entry.get('host', ''), 'app': port_entry.get('app', '') }) banner_upper = banner.upper() if 'TLS' in banner_upper or 'SSL' in banner_upper: service_info['ssl'] = True except Exception: pass services[str(port_num)] = service_info return services def detect_os_cves(os_string): """Detect CVEs mentioned in OS information.""" if not os_string: return [] cve_pattern = r'(CVE-[0-9]{4}-[0-9]+)' detected_cves = re.findall(cve_pattern, os_string) os_info = { 'cves': detected_cves, 'os_type': os_string.split(' ')[0] if os_string else None, 'kernel_version': os_string.split(' (')[0].split('.')[-1] if ' (' in os_string else None } return [{'cve_id': cve_id, 'source': 'OS', 'description': f'Potential vulnerability in {os_string}'} for cve_id in detected_cves] def detect_service_cves(banners): """Detect CVEs mentioned in service banners.""" all_cves = [] if not banners: return all_cves cve_pattern = r'(CVE-[0-9]{4}-[0-9]+)' for banner_info in banners: raw_content = banner_info.get('raw', '') detected_cves = re.findall(cve_pattern, raw_content) if detected_cves: for cve_id in detected_cves: all_cves.append({ 'cve_id': cve_id, 'source': banner_info.get('app', 'Service'), 'port': banner_info.get('port', 0), 'description': f'Potential vulnerability found in {banner_info.get("hostname", "service")}' }) return all_cves def analyze_host_vulnerabilities(host_data): """Analyze a host for vulnerabilities from OS and services.""" detected_cves = [] os_cves = detect_os_cves(host_data.get('os')) detected_cves.extend(os_cves) service_cves = detect_service_cves(host_data.get('banners', [])) detected_cves.extend(service_cves) host_data['vulnerabilities'] = detected_cves return host_data def scan_subdomain(subdomain, api_key): """Scan all hosts associated with a subdomain.""" api = shodan.Shodan(api_key) try: results = api.search(f"hostname:{subdomain}") if not results.get('matches'): return { 'subdomain': subdomain, 'hosts': [], 'total_hosts': 0, 'unique_services': set(), 'vulnerable_hosts': [] } hosts = [] unique_ports = set() all_services = set() vulnerable_count = 0 for match in results['matches']: ip = match.get('ip_str') if not ip: continue host_info = search_host_shodan(ip, api_key) services = extract_service_info(host_info) unique_ports.update(str(p) for p in host_info.get('ports', [])) all_services.update(services.keys()) vulnerable_host = analyze_host_vulnerabilities(host_info) if vulnerable_host['vulnerabilities']: vulnerable_count += 1 hosts.append({ 'ip': host_info['ip'], 'org': host_info.get('org', 'Unknown'), 'os': host_info.get('os'), 'ports_count': len(host_info.get('ports', [])), 'services': services, 'banners': host_info.get('banners', []), 'vulnerabilities': vulnerable_host['vulnerabilities'] }) return { 'subdomain': subdomain, 'hosts': hosts, 'total_hosts': len(hosts), 'unique_ports': unique_ports, 'unique_services': all_services, 'vulnerable_hosts': vulnerable_count } except shodan.APIError as e: print(f"Error querying Shodan for {subdomain}: {e}", file=sys.stderr) return { 'subdomain': subdomain, 'hosts': [], 'total_hosts': 0, 'unique_services': set(), 'vulnerable_hosts': 0 } def generate_summary_report(all_results): """Generate a summary of scan results.""" total_hosts = sum(r['total_hosts'] for r in all_results) unique_services = set() vulnerable_hosts_count = 0 for result in all_results: unique_services.update(result.get('unique_services', [])) vulnerable_hosts_count += result.get('vulnerable_hosts', 0) hosts_with_cves = sum(1 for r in all_results if r['vulnerable_hosts'] > 0) return { 'total_subdomains_scanned': len(all_results), 'total_hosts_found': total_hosts, 'unique_services_identified': len(unique_services), 'hosts_with_cves': hosts_with_cves, 'total_vulnerable_hosts': vulnerable_hosts_count, 'scan_timestamp': datetime.now().isoformat() } def main(): parser = argparse.ArgumentParser( description="Enhanced Security Scanner for Tesla Subdomains using Shodan API", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python tesla_security_scan.py -d tesla.com -s YOUR_API_KEY -o report.json python tesla_security_scan.py -i subdomains.txt -s YOUR_API_KEY --json-output scan_results.json """ ) parser.add_argument( "-d", "--domain", dest="domain", help="Single domain to scan (e.g., tesla.com)" ) parser.add_argument( "-i", "--input-file", dest="file_name", help=".txt file containing subdomains to scan" ) parser.add_argument( "-s", "--shodan-key", dest="shodan_key", required=True, help="Shodan API Key" ) parser.add_argument( "-o", "--json-output", dest="json_output", default=None, help="Output JSON file for detailed results (default: console)" ) parser.add_argument( "-q", "--quick-summary", action="store_true", help="Print quick summary only" ) args = parser.parse_args() if not args.domain and not args.file_name: print("Error: Please provide either -d (domain) or -i (input file)", file=sys.stderr) parser.print_help() sys.exit(1) try: api = shodan.Shodan(args.shodan_key) except shodan.APIError as e: print(f"Error initializing Shodan API: {e}", file=sys.stderr) sys.exit(1) if args.file_name: subdomains = load_subdomains_from_file(args.file_name) else: subdomains = [args.domain] print(f"[*] Starting security scan for {len(subdomains)} target(s)...") print(f"[+] API Key validated (Shodan)") print() all_results = [] total_hosts = 0 total_services = set() vulnerable_count = 0 for i, subdomain in enumerate(subdomains, 1): print(f"[*] Scanning {i}/{len(subdomains)}: {subdomain}...") result = scan_subdomain(subdomain, args.shodan_key) all_results.append(result) total_hosts += result['total_hosts'] total_services.update(result.get('unique_services', [])) vulnerable_count += result.get('vulnerable_hosts', 0) summary = generate_summary_report(all_results) if args.json_output: full_report = { 'summary': summary, 'detailed_results': all_results } with open(args.json_output, 'w', encoding='utf-8') as f: json.dump(full_report, f, indent=2, ensure_ascii=False) print(f"\n[+] Detailed JSON report saved to: {args.json_output}") print("\n" + "=" * 70) print("EXECUTIVE SECURITY SUMMARY") print("=" * 70) print() print(f"[INFO] Total Subdomains Scanned : {summary['total_subdomains_scanned']}") print(f"[INFO] Total Hosts Identified : {summary['total_hosts_found']:.0f}") print(f"[INFO] Unique Services Detected : {summary['unique_services_identified']:.0f}") print() vuln_summary = summary.get('hosts_with_cves', 0) total_vuln = summary.get('total_vulnerable_hosts', 0) if vuln_summary > 0: print(f"[ALERT] Hosts with CVE Detection : {vuln_summary}") print(f"[CRITICAL] Total Vulnerable Hosts : {total_vuln}") if total_vuln > 0: vulnerability_rate = (total_vuln / summary['total_hosts_found'] * 100) if summary['total_hosts_found'] > 0 else 0 print(f"[STAT] Vulnerability Rate : {vulnerability_rate:.2f}%") elif total_hosts > 0: print(f"[INFO] No CVEs Detected in {summary['total_subdomains_scanned']} hosts") print() print("=" * 70) print("SCAN COMPLETE") print("=" * 70) if __name__ == "__main__": main()