feat: Semaine 10
This commit is contained in:
@@ -0,0 +1,360 @@
|
||||
"""
|
||||
tesla_security_scan.py - Enhanced Security Scanner for Tesla Subdomains using Shodan API
|
||||
|
||||
Features:
|
||||
- Read subdomains from .txt file or command line
|
||||
- Query Shodan API for each host with detailed info (ports, services, versions)
|
||||
- Detect CVEs using OS and service banner intelligence
|
||||
- Generate structured JSON report with vulnerability analysis
|
||||
- Provide executive summary review
|
||||
"""
|
||||
|
||||
import sys
|
||||
import json
|
||||
import argparse
|
||||
import shodan
|
||||
import re
|
||||
from datetime import datetime
|
||||
from collections import defaultdict
|
||||
|
||||
|
||||
def load_subdomains_from_file(file_path):
|
||||
"""Load subdomains from a .txt file."""
|
||||
with open(file_path, 'r', encoding='utf-8') as f:
|
||||
domains = [line.strip() for line in f if line.strip()]
|
||||
return domains
|
||||
|
||||
def search_host_shodan(ip_address, api_key):
|
||||
"""Get detailed host information from Shodan API."""
|
||||
try:
|
||||
api = shodan.Shodan(api_key)
|
||||
|
||||
results = api.host(ip_address)
|
||||
|
||||
return {
|
||||
'ip': ip_address,
|
||||
'ports': [],
|
||||
'services': {},
|
||||
'os': results.get('os', None),
|
||||
'org': results.get('org', 'Unknown'),
|
||||
'last_update': datetime.now().isoformat(),
|
||||
'banners': [],
|
||||
'vulnerabilities': []
|
||||
}
|
||||
except shodan.APIError as e:
|
||||
return {
|
||||
'ip': ip_address,
|
||||
'error': str(e),
|
||||
'ports': [],
|
||||
'services': {},
|
||||
'os': None,
|
||||
'org': 'Unknown',
|
||||
'last_update': datetime.now().isoformat(),
|
||||
'banners': []
|
||||
}
|
||||
|
||||
|
||||
def extract_service_info(shodan_data):
|
||||
"""Extract and organize service information from Shodan data."""
|
||||
services = {}
|
||||
|
||||
if not shodan_data or 'ports' not in shodan_data:
|
||||
return services
|
||||
|
||||
for port_entry in shodan_data.get('ports', []):
|
||||
port_num = port_entry.get('port')
|
||||
banner = port_entry.get('banner', '')
|
||||
|
||||
service_info = {
|
||||
'port': port_num,
|
||||
'protocol': port_entry.get('transport', 'tcp'),
|
||||
'service': port_entry.get('name', 'unknown'),
|
||||
'version': port_entry.get('product') or port_entry.get('version', 'unknown'),
|
||||
'banners': []
|
||||
}
|
||||
|
||||
if banner:
|
||||
try:
|
||||
banner_lines = banner.split('\n')
|
||||
service_info['banners'].append({
|
||||
'raw': '\n'.join(banner_lines[:10]),
|
||||
'hostname': port_entry.get('host', ''),
|
||||
'app': port_entry.get('app', '')
|
||||
})
|
||||
|
||||
banner_upper = banner.upper()
|
||||
if 'TLS' in banner_upper or 'SSL' in banner_upper:
|
||||
service_info['ssl'] = True
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
services[str(port_num)] = service_info
|
||||
|
||||
return services
|
||||
|
||||
|
||||
def detect_os_cves(os_string):
|
||||
"""Detect CVEs mentioned in OS information."""
|
||||
if not os_string:
|
||||
return []
|
||||
|
||||
cve_pattern = r'(CVE-[0-9]{4}-[0-9]+)'
|
||||
detected_cves = re.findall(cve_pattern, os_string)
|
||||
|
||||
os_info = {
|
||||
'cves': detected_cves,
|
||||
'os_type': os_string.split(' ')[0] if os_string else None,
|
||||
'kernel_version': os_string.split(' (')[0].split('.')[-1] if ' (' in os_string else None
|
||||
}
|
||||
|
||||
return [{'cve_id': cve_id, 'source': 'OS', 'description': f'Potential vulnerability in {os_string}'} for cve_id in detected_cves]
|
||||
|
||||
|
||||
def detect_service_cves(banners):
|
||||
"""Detect CVEs mentioned in service banners."""
|
||||
all_cves = []
|
||||
|
||||
if not banners:
|
||||
return all_cves
|
||||
|
||||
cve_pattern = r'(CVE-[0-9]{4}-[0-9]+)'
|
||||
|
||||
for banner_info in banners:
|
||||
raw_content = banner_info.get('raw', '')
|
||||
|
||||
detected_cves = re.findall(cve_pattern, raw_content)
|
||||
|
||||
if detected_cves:
|
||||
for cve_id in detected_cves:
|
||||
all_cves.append({
|
||||
'cve_id': cve_id,
|
||||
'source': banner_info.get('app', 'Service'),
|
||||
'port': banner_info.get('port', 0),
|
||||
'description': f'Potential vulnerability found in {banner_info.get("hostname", "service")}'
|
||||
})
|
||||
|
||||
return all_cves
|
||||
|
||||
|
||||
def analyze_host_vulnerabilities(host_data):
|
||||
"""Analyze a host for vulnerabilities from OS and services."""
|
||||
detected_cves = []
|
||||
|
||||
os_cves = detect_os_cves(host_data.get('os'))
|
||||
detected_cves.extend(os_cves)
|
||||
|
||||
service_cves = detect_service_cves(host_data.get('banners', []))
|
||||
detected_cves.extend(service_cves)
|
||||
|
||||
host_data['vulnerabilities'] = detected_cves
|
||||
|
||||
return host_data
|
||||
|
||||
|
||||
def scan_subdomain(subdomain, api_key):
|
||||
"""Scan all hosts associated with a subdomain."""
|
||||
api = shodan.Shodan(api_key)
|
||||
|
||||
try:
|
||||
results = api.search(f"hostname:{subdomain}")
|
||||
|
||||
if not results.get('matches'):
|
||||
return {
|
||||
'subdomain': subdomain,
|
||||
'hosts': [],
|
||||
'total_hosts': 0,
|
||||
'unique_services': set(),
|
||||
'vulnerable_hosts': []
|
||||
}
|
||||
|
||||
hosts = []
|
||||
unique_ports = set()
|
||||
all_services = set()
|
||||
vulnerable_count = 0
|
||||
|
||||
for match in results['matches']:
|
||||
ip = match.get('ip_str')
|
||||
if not ip:
|
||||
continue
|
||||
|
||||
host_info = search_host_shodan(ip, api_key)
|
||||
services = extract_service_info(host_info)
|
||||
unique_ports.update(str(p) for p in host_info.get('ports', []))
|
||||
all_services.update(services.keys())
|
||||
vulnerable_host = analyze_host_vulnerabilities(host_info)
|
||||
|
||||
if vulnerable_host['vulnerabilities']:
|
||||
vulnerable_count += 1
|
||||
|
||||
hosts.append({
|
||||
'ip': host_info['ip'],
|
||||
'org': host_info.get('org', 'Unknown'),
|
||||
'os': host_info.get('os'),
|
||||
'ports_count': len(host_info.get('ports', [])),
|
||||
'services': services,
|
||||
'banners': host_info.get('banners', []),
|
||||
'vulnerabilities': vulnerable_host['vulnerabilities']
|
||||
})
|
||||
|
||||
return {
|
||||
'subdomain': subdomain,
|
||||
'hosts': hosts,
|
||||
'total_hosts': len(hosts),
|
||||
'unique_ports': unique_ports,
|
||||
'unique_services': all_services,
|
||||
'vulnerable_hosts': vulnerable_count
|
||||
}
|
||||
|
||||
except shodan.APIError as e:
|
||||
print(f"Error querying Shodan for {subdomain}: {e}", file=sys.stderr)
|
||||
return {
|
||||
'subdomain': subdomain,
|
||||
'hosts': [],
|
||||
'total_hosts': 0,
|
||||
'unique_services': set(),
|
||||
'vulnerable_hosts': 0
|
||||
}
|
||||
|
||||
|
||||
def generate_summary_report(all_results):
|
||||
"""Generate a summary of scan results."""
|
||||
total_hosts = sum(r['total_hosts'] for r in all_results)
|
||||
unique_services = set()
|
||||
vulnerable_hosts_count = 0
|
||||
|
||||
for result in all_results:
|
||||
unique_services.update(result.get('unique_services', []))
|
||||
vulnerable_hosts_count += result.get('vulnerable_hosts', 0)
|
||||
|
||||
hosts_with_cves = sum(1 for r in all_results if r['vulnerable_hosts'] > 0)
|
||||
|
||||
return {
|
||||
'total_subdomains_scanned': len(all_results),
|
||||
'total_hosts_found': total_hosts,
|
||||
'unique_services_identified': len(unique_services),
|
||||
'hosts_with_cves': hosts_with_cves,
|
||||
'total_vulnerable_hosts': vulnerable_hosts_count,
|
||||
'scan_timestamp': datetime.now().isoformat()
|
||||
}
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Enhanced Security Scanner for Tesla Subdomains using Shodan API",
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
epilog="""
|
||||
Examples:
|
||||
python tesla_security_scan.py -d tesla.com -s YOUR_API_KEY -o report.json
|
||||
python tesla_security_scan.py -i subdomains.txt -s YOUR_API_KEY --json-output scan_results.json
|
||||
"""
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"-d", "--domain",
|
||||
dest="domain",
|
||||
help="Single domain to scan (e.g., tesla.com)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-i", "--input-file",
|
||||
dest="file_name",
|
||||
help=".txt file containing subdomains to scan"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-s", "--shodan-key",
|
||||
dest="shodan_key",
|
||||
required=True,
|
||||
help="Shodan API Key"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-o", "--json-output",
|
||||
dest="json_output",
|
||||
default=None,
|
||||
help="Output JSON file for detailed results (default: console)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-q", "--quick-summary",
|
||||
action="store_true",
|
||||
help="Print quick summary only"
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if not args.domain and not args.file_name:
|
||||
print("Error: Please provide either -d (domain) or -i (input file)", file=sys.stderr)
|
||||
parser.print_help()
|
||||
sys.exit(1)
|
||||
|
||||
try:
|
||||
api = shodan.Shodan(args.shodan_key)
|
||||
except shodan.APIError as e:
|
||||
print(f"Error initializing Shodan API: {e}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
if args.file_name:
|
||||
subdomains = load_subdomains_from_file(args.file_name)
|
||||
else:
|
||||
subdomains = [args.domain]
|
||||
|
||||
print(f"[*] Starting security scan for {len(subdomains)} target(s)...")
|
||||
print(f"[+] API Key validated (Shodan)")
|
||||
print()
|
||||
|
||||
all_results = []
|
||||
total_hosts = 0
|
||||
total_services = set()
|
||||
vulnerable_count = 0
|
||||
|
||||
for i, subdomain in enumerate(subdomains, 1):
|
||||
print(f"[*] Scanning {i}/{len(subdomains)}: {subdomain}...")
|
||||
|
||||
result = scan_subdomain(subdomain, args.shodan_key)
|
||||
all_results.append(result)
|
||||
|
||||
total_hosts += result['total_hosts']
|
||||
total_services.update(result.get('unique_services', []))
|
||||
vulnerable_count += result.get('vulnerable_hosts', 0)
|
||||
|
||||
summary = generate_summary_report(all_results)
|
||||
|
||||
if args.json_output:
|
||||
full_report = {
|
||||
'summary': summary,
|
||||
'detailed_results': all_results
|
||||
}
|
||||
|
||||
with open(args.json_output, 'w', encoding='utf-8') as f:
|
||||
json.dump(full_report, f, indent=2, ensure_ascii=False)
|
||||
|
||||
print(f"\n[+] Detailed JSON report saved to: {args.json_output}")
|
||||
|
||||
print("\n" + "=" * 70)
|
||||
print("EXECUTIVE SECURITY SUMMARY")
|
||||
print("=" * 70)
|
||||
print()
|
||||
|
||||
print(f"[INFO] Total Subdomains Scanned : {summary['total_subdomains_scanned']}")
|
||||
print(f"[INFO] Total Hosts Identified : {summary['total_hosts_found']:.0f}")
|
||||
print(f"[INFO] Unique Services Detected : {summary['unique_services_identified']:.0f}")
|
||||
print()
|
||||
|
||||
vuln_summary = summary.get('hosts_with_cves', 0)
|
||||
total_vuln = summary.get('total_vulnerable_hosts', 0)
|
||||
|
||||
if vuln_summary > 0:
|
||||
print(f"[ALERT] Hosts with CVE Detection : {vuln_summary}")
|
||||
print(f"[CRITICAL] Total Vulnerable Hosts : {total_vuln}")
|
||||
|
||||
if total_vuln > 0:
|
||||
vulnerability_rate = (total_vuln / summary['total_hosts_found'] * 100) if summary['total_hosts_found'] > 0 else 0
|
||||
print(f"[STAT] Vulnerability Rate : {vulnerability_rate:.2f}%")
|
||||
elif total_hosts > 0:
|
||||
print(f"[INFO] No CVEs Detected in {summary['total_subdomains_scanned']} hosts")
|
||||
|
||||
print()
|
||||
print("=" * 70)
|
||||
print("SCAN COMPLETE")
|
||||
print("=" * 70)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user