### Exercice 1 """ Pour chaque situation, indiquer si l'usage demande : - du Threading - du MultiProcessing - Aucun des deux 1. Vous développez un script qui télécharge 50 fichiers PDF depuis un serveur distant ==> Threading ✅ 2. Votre programme doit appliquer un filtre de flou sur 500 images HD ==> MultiProcessing ✅ 3. Vous écrivez un ChatBot qui écoute les messages de 10 utilisateurs simultanés sur un serveur socket ==> Threading ✅ 4. Votre script lit un fichier CSV de 20 lignes et affiche le résultat dans la console ==> MultiProcessing ⛔ (Aucun des deux) 5. Vous devez entraîner 4 modèles de Machine Learning indépendants sur des jeux de données différents ==> Threading ⛔ (Multiprocessing) 6. Votre appli web doit envoyer un courriel de confirmation et une notification Slack à chaque inscription ==> Threading asynchrone ✅ 7. Vous écrivez un script qui calcule les nombres premiers jusqu'à 10 millions, en découpant la plage en 4 sous-plages ==> MultiProcessing ✅ 8. Votre programme doit surveiller 3 dossiers en même temps et réagir quand un nouveau fichier apparaît ==> Threading ✅ """ ### Exercice 2 import os, time, requests from threading import Thread, current_thread from multiprocessing import current_process def io_bound(sec): pid = os.getpid() thread_name = current_thread().name process_name = current_process().name print(f"{pid} - {process_name} - {thread_name} => Start sleeping...") time.sleep(sec) print(f"{pid} - {process_name} - {thread_name} => Stop sleeping...") def cpu_bound(n): pid = os.getpid() thread_name = current_thread().name process_name = current_process().name print(f"{pid} - {process_name} - {thread_name} => Start counting...") while n > 0: n -= 1 print(f"{pid} - {process_name} - {thread_name} => Stop counting...") ###### Partie A : IO-bound URLS = [ "https://httpbin.org/get", "https://httpbin.org/ip", "https://httpbin.org/user-agent", "https://httpbin.org/headers", "https://httpbin.org/uuid", "https://httpbin.org/base64/SWYgeW91IGNhbiByZWFkIHRoaXMsIGJyYXZvICE=", "https://httpbin.org/bytes/100", "https://httpbin.org/delay/0", ] SLEEP = 1 def get_status_sequential(urls: list[str]): for url in urls: time.sleep(SLEEP) print(f"\ URL : {url}\n\ Status : {requests.get(url).status_code}") def get_status_threading(urls: list[str]): results = {} threads = [] def fetch_url(url): pid = os.getpid() thread_name = current_thread().name process_name = current_process().name print(f"{pid} - {process_name} - {thread_name} => Fetching {url}...") status = requests.get(url).status_code results[url] = status print(f"{pid} - {process_name} - {thread_name} => {url}: {status}") for url in urls: thread = Thread(target=fetch_url, args=(url,)) threads.append(thread) thread.start() for thread in threads: thread.join() return results if __name__ == "__main__": print("\n--- Sequential Approach ---") start_time = time.perf_counter() get_status_sequential(URLS) end_time = time.perf_counter() seq_time = end_time - start_time print(f"Sequential time: {seq_time:.2f} seconds") print("\n--- Threading Approach ---") start_time = time.perf_counter() get_status_threading(URLS) end_time = time.perf_counter() thread_time = end_time - start_time print(f"Threading time: {thread_time:.2f} seconds") ###### Partie B : CPU-bound import hashlib from multiprocessing import Manager, Process def calculer_hash(donnee): for _ in range(500_000): # travail CPU intentionnellement lourd hashlib.sha256(donnee.encode()).hexdigest() return hashlib.sha256(donnee.encode()).hexdigest() DONNEES = ["alpha", "beta", "gamma", "delta"] def get_hash_sequential(data_set: list[str]): hash_results = [] for data in data_set: hash_result = calculer_hash(data) print(hash_result) hash_results.append(hash_result) return hash_results def get_hash_threading(data_set: list[str]): results = {} threads = [] def hash_calculus(data): pid = os.getpid() thread_name = current_thread().name process_name = current_process().name print(f"{pid} - {process_name} - {thread_name} => Hashing {data}...") result = calculer_hash(data) results[data] = result print(f"{pid} - {process_name} - {thread_name} => {data}: {result}") for data in data_set: thread = Thread(target=hash_calculus, args=(data,)) threads.append(thread) thread.start() for thread in threads: thread.join() return results def hash_calculus(data, results): pid = os.getpid() thread_name = current_thread().name process_name = current_process().name print(f"{pid} - {process_name} - {thread_name} => Hashing {data}...") result = calculer_hash(data) results[data] = result print(f"{pid} - {process_name} - {thread_name} => {data}: {result}") def get_hash_multiprocessing(data_set: list[str]): with Manager() as manager: results = manager.dict() processes = [] for data in data_set: process = Process(target=hash_calculus, args=(data, results)) processes.append(process) process.start() for process in processes: process.join() return dict(results) if __name__ == "__main__": print("\n--- Sequential Approach ---") start_time = time.perf_counter() get_hash_sequential(DONNEES) end_time = time.perf_counter() seq_time = end_time - start_time print(f"Sequential time: {seq_time:.2f} seconds") print("\n--- Threading Approach ---") start_time = time.perf_counter() get_hash_threading(DONNEES) end_time = time.perf_counter() thread_time = end_time - start_time print(f"Threading time: {thread_time:.2f} seconds") print("\n--- MultiProcessing Approach ---") start_time = time.perf_counter() get_hash_multiprocessing(DONNEES) end_time = time.perf_counter() thread_time = end_time - start_time print(f"Threading time: {thread_time:.2f} seconds")