from peewee import * # import os # from dotenv import load_dotenv # load_dotenv() # db_name = os.environ.get("DB_NAME") # db_user = os.environ.get("DB_USER") # db_password = os.environ.get("DB_PASSWORD") # db_host = os.environ.get("DB_HOST") # db_port = os.environ.get("DB_PORT") db_name = "company" db_user = "root" db_password = "MementoMarsouin96" db_host = "localhost" db_port = 3306 db = MySQLDatabase( database= db_name, user= db_user, password= db_password, host= db_host, port= db_port ) # équivalent de "Base = declarative_base()" class BaseModel(Model): class Meta: database = db class Department(BaseModel): department_id = AutoField(primary_key= True) department_name = CharField(max_length= 50, null= False, unique= True) location_id = DecimalField(max_digits= 4, decimal_places= 0, null= False) # équivalent de "__tablename__ = "truc"" class Meta: table_name = "departments" class Employee(BaseModel): employee_id = AutoField(primary_key= True) first_name = CharField(max_length= 50, null= False) last_name = CharField(max_length= 50, null= False) email = CharField(max_length= 50, null= False, unique= True) salary = IntegerField(null= False) department = ForeignKeyField(Department, backref= "employees", null= False) class Meta: table_name = "employees" db.connect() db.create_tables([Department, Employee]) try: it_dept = Department.create(department_name= "IT", location_id= 1400) except IntegrityError: print("Département déjà existant.") it_dept = Department.get(Department.department_name == "IT") try: Employee.create( first_name= "Alice", last_name= "Smith", email= "a.smith@company.au", salary= 4000, department= it_dept # utiliser l'OBJET, pas le nom de l'objet ) except IntegrityError: print("Employé(e) déjà existant(e).") new_employees = [ {'first_name': 'Marie', 'last_name': 'Dupont', 'email': 'mdupont', 'salary': 4500}, {'first_name': 'Jean', 'last_name': 'Martin', 'email': 'jmartin', 'salary': 5200}, {'first_name': 'Sophie', 'last_name': 'Bernard', 'email': 'sbernard', 'salary': 4800}, ] # with db.atomic(): # Employee.insert_many(new_employees).execute() employee_01 = Employee.get(1) print(employee_01.first_name, employee_01.last_name) employee_02 = Employee.get(Employee.email == "a.smith@company.au") print(employee_02.first_name, employee_02.salary) try: employee_76 = Employee.get(76) except DoesNotExist: print("L'employé(e) 76 n'existe pas.") employee_select_01 = Employee.select() # SELECT * FROM employees print(employee_select_01) for employee in employee_select_01: print(employee.first_name, employee.last_name, employee.email) employee_select_02 = Employee.select().where( (Employee.salary > 8000) & (Employee.department == it_dept) ) print(employee_select_02) # pagination page_01 = Employee.select().order_by(Employee.employee_id).paginate(1, 10) # compter une valeur salary_01 = Employee.select().where(Employee.salary > 10000).count() # Jointures (LEFT JOIN, RIGHT JOIN etc) ### FULL JOIN query = (Employee .select(Employee, Department) .join(Department) .where(Department.department_name == "IT")) ### Employés affectés à un département ### et ceux non affectés à un département query = (Employee .select(Employee, Department) .join(Department, JOIN.LEFT_OUTER) .where(Department.department_name == "IT"))