35 lines
1.1 KiB
Python
35 lines
1.1 KiB
Python
import time
|
|
|
|
class CircuitBreaker:
|
|
def __init__(self, failure_threshold: int = 3, recovery_timeout: int = 10) -> None:
|
|
self.failure_threshold = failure_threshold
|
|
self.recovery_timeout = recovery_timeout
|
|
self.failure_count = 0
|
|
self.last_failure_time = None
|
|
self.state = "CLOSED" # OPEN, CLOSED, HALF-OPEN
|
|
|
|
def call(self, function, *args, **kwargs):
|
|
if self.state == "OPEN" and self.last_failure_time is not None:
|
|
if time.time() - self.last_failure_time >= self.recovery_timeout:
|
|
self.state = "HALF-OPEN"
|
|
else:
|
|
raise Exception("Circuit is open")
|
|
|
|
try:
|
|
result = function(*args, **kwargs)
|
|
self.on_success()
|
|
return result
|
|
except Exception as error:
|
|
self.on_failure()
|
|
raise error
|
|
|
|
def on_success(self):
|
|
self.failure_count = 0
|
|
self.state = "CLOSED"
|
|
|
|
def on_failure(self):
|
|
self.failure_count += 1
|
|
self.last_failure_time = time.time()
|
|
if self.failure_count >= self.failure_threshold:
|
|
self.state = "OPEN"
|