Files
TermNSI/Processus/Corrige_TP_Ordonnanceur.py

391 lines
12 KiB
Python

"""
Corrigé du TP Simulateur d'Ordonnanceur
"""
# =============================================================================
# PARTIE 1 : Classe Processus
# =============================================================================
class Processus:
"""Représente un processus système."""
compteur_pid = 0
def __init__(self, nom, duree_totale, priorite=0):
"""
Constructeur de la classe Processus.
:param nom: (str) Nom du processus
:param duree_totale: (int) Durée totale d'exécution
:param priorite: (int) Priorité (0 = normale)
"""
Processus.compteur_pid += 1
self.pid = Processus.compteur_pid
self.nom = nom
self.duree_totale = duree_totale
self.duree_restante = duree_totale
self.etat = "nouveau"
self.priorite = priorite
self.temps_arrivee = 0
self.temps_debut = None
self.temps_fin = None
def executer(self, quantum):
"""
Exécute le processus pendant quantum unités de temps.
:param quantum: (int) Temps d'exécution alloué
:return: (int) Temps réellement utilisé
"""
temps_utilise = min(quantum, self.duree_restante)
self.duree_restante -= temps_utilise
return temps_utilise
def est_termine(self):
"""Vérifie si le processus est terminé."""
return self.duree_restante <= 0
def __repr__(self):
return f"[PID {self.pid}] {self.nom} ({self.etat}) - {self.duree_restante}/{self.duree_totale}"
# =============================================================================
# PARTIE 2 : Ordonnanceur FIFO
# =============================================================================
class OrdonnanceurFIFO:
"""Ordonnanceur First In First Out."""
def __init__(self):
self.file_prets = []
self.historique = []
self.temps = 0
self.processus_termines = []
def ajouter_processus(self, processus):
"""Ajoute un processus à la file."""
processus.etat = "pret"
processus.temps_arrivee = self.temps
self.file_prets.append(processus)
print(f"[T={self.temps}] {processus.nom} ajouté (durée: {processus.duree_totale})")
def executer_suivant(self):
"""Exécute le prochain processus jusqu'à la fin."""
if len(self.file_prets) == 0:
return False
processus = self.file_prets.pop(0)
processus.etat = "elu"
processus.temps_debut = self.temps
print(f"\n[T={self.temps}] >>> {processus.nom} commence")
while not processus.est_termine():
temps_utilise = processus.executer(1)
self.temps += temps_utilise
self.historique.append(processus.nom[0])
processus.etat = "termine"
processus.temps_fin = self.temps
self.processus_termines.append(processus)
print(f"[T={self.temps}] <<< {processus.nom} terminé")
return True
def executer_tous(self):
"""Exécute tous les processus."""
print(f"\n{'='*50}")
print("ORDONNANCEUR FIFO")
print(f"{'='*50}")
while self.executer_suivant():
pass
self.afficher_resultats()
def afficher_resultats(self):
"""Affiche les résultats de l'ordonnancement."""
print(f"\n{'='*50}")
print(f"Temps total : {self.temps}")
print(f"Diagramme : {''.join(self.historique)}")
print(f"{'='*50}")
print("\nStatistiques par processus :")
temps_attente_total = 0
temps_rotation_total = 0
for p in self.processus_termines:
rotation = p.temps_fin - p.temps_arrivee
attente = rotation - p.duree_totale
temps_attente_total += attente
temps_rotation_total += rotation
print(f" {p.nom}: rotation={rotation}, attente={attente}")
n = len(self.processus_termines)
print(f"\nMoyennes: rotation={temps_rotation_total/n:.1f}, attente={temps_attente_total/n:.1f}")
# =============================================================================
# PARTIE 3 : Ordonnanceur Round Robin
# =============================================================================
class OrdonnanceurRoundRobin:
"""Ordonnanceur Round Robin (tourniquet)."""
def __init__(self, quantum=2):
self.file_prets = []
self.quantum = quantum
self.temps = 0
self.historique = []
self.processus_termines = []
def ajouter_processus(self, processus):
"""Ajoute un processus à la file."""
processus.etat = "pret"
processus.temps_arrivee = self.temps
self.file_prets.append(processus)
def executer_cycle(self):
"""Exécute un cycle d'ordonnancement."""
if len(self.file_prets) == 0:
return False
processus = self.file_prets.pop(0)
processus.etat = "elu"
if processus.temps_debut is None:
processus.temps_debut = self.temps
temps_utilise = processus.executer(self.quantum)
self.temps += temps_utilise
for _ in range(temps_utilise):
self.historique.append(processus.nom[0])
if processus.est_termine():
processus.etat = "termine"
processus.temps_fin = self.temps
self.processus_termines.append(processus)
print(f"[T={self.temps}] {processus.nom} TERMINÉ")
else:
processus.etat = "pret"
self.file_prets.append(processus)
print(f"[T={self.temps}] {processus.nom} → file ({processus.duree_restante} restant)")
return True
def executer_tous(self):
"""Exécute tous les processus."""
print(f"\n{'='*50}")
print(f"ORDONNANCEUR ROUND ROBIN (quantum={self.quantum})")
print(f"{'='*50}\n")
while self.executer_cycle():
pass
self.afficher_resultats()
def afficher_resultats(self):
"""Affiche les résultats."""
print(f"\n{'='*50}")
print(f"Temps total : {self.temps}")
print(f"Diagramme : {''.join(self.historique)}")
print(f"{'='*50}")
print("\nStatistiques par processus :")
temps_attente_total = 0
temps_rotation_total = 0
temps_reponse_total = 0
for p in self.processus_termines:
rotation = p.temps_fin - p.temps_arrivee
attente = rotation - p.duree_totale
reponse = p.temps_debut - p.temps_arrivee
temps_attente_total += attente
temps_rotation_total += rotation
temps_reponse_total += reponse
print(f" {p.nom}: rotation={rotation}, attente={attente}, réponse={reponse}")
n = len(self.processus_termines)
print(f"\nMoyennes: rotation={temps_rotation_total/n:.1f}, attente={temps_attente_total/n:.1f}, réponse={temps_reponse_total/n:.1f}")
# =============================================================================
# PARTIE 4 : Ressources et Interblocage
# =============================================================================
class Ressource:
"""Représente une ressource partagée."""
def __init__(self, nom):
self.nom = nom
self.proprietaire = None
def est_disponible(self):
return self.proprietaire is None
def acquerir(self, pid):
if self.est_disponible():
self.proprietaire = pid
return True
return False
def liberer(self):
self.proprietaire = None
def __repr__(self):
if self.est_disponible():
return f"[{self.nom}] LIBRE"
return f"[{self.nom}] → PID {self.proprietaire}"
class ProcessusAvecRessources(Processus):
"""Processus qui nécessite des ressources."""
def __init__(self, nom, duree_totale, ressources_requises):
super().__init__(nom, duree_totale)
self.ressources_requises = ressources_requises
self.ressources_obtenues = []
def demander_ressources(self, ressources_dispo):
"""Tente d'obtenir les ressources nécessaires."""
for nom_res in self.ressources_requises:
if nom_res not in self.ressources_obtenues:
ressource = ressources_dispo.get(nom_res)
if ressource and ressource.acquerir(self.pid):
self.ressources_obtenues.append(nom_res)
print(f"{self.nom} obtient {nom_res}")
else:
print(f"{self.nom} BLOQUÉ sur {nom_res}")
self.etat = "bloque"
return False
return True
def liberer_ressources(self, ressources_dispo):
"""Libère toutes les ressources."""
for nom_res in self.ressources_obtenues:
ressources_dispo[nom_res].liberer()
print(f"{self.nom} libère {nom_res}")
self.ressources_obtenues = []
def detecter_interblocage(processus_list, ressources):
"""Détecte un interblocage."""
processus_actifs = [p for p in processus_list if p.etat != "termine"]
if len(processus_actifs) == 0:
return False
tous_bloques = all(p.etat == "bloque" for p in processus_actifs)
if tous_bloques:
print("\n" + "!"*50)
print("!!! INTERBLOCAGE DÉTECTÉ !!!")
print("!"*50)
for p in processus_actifs:
attend = [r for r in p.ressources_requises if r not in p.ressources_obtenues]
print(f" {p.nom}: possède {p.ressources_obtenues}, attend {attend}")
return True
return False
# =============================================================================
# PARTIE 5 : Comparaison
# =============================================================================
def comparer_ordonnanceurs():
"""Compare FIFO et Round Robin."""
processus_test = [
("Navigateur", 8),
("Editeur", 4),
("Terminal", 2),
("Musique", 6)
]
print("\n" + "="*60)
print("COMPARAISON FIFO vs ROUND ROBIN")
print("="*60)
# Test FIFO
Processus.compteur_pid = 0
fifo = OrdonnanceurFIFO()
for nom, duree in processus_test:
fifo.ajouter_processus(Processus(nom, duree))
fifo.executer_tous()
# Test Round Robin
Processus.compteur_pid = 0
rr = OrdonnanceurRoundRobin(quantum=2)
for nom, duree in processus_test:
rr.ajouter_processus(Processus(nom, duree))
rr.executer_tous()
# =============================================================================
# TESTS
# =============================================================================
if __name__ == "__main__":
print("="*60)
print("TEST 1 : ORDONNANCEUR FIFO")
print("="*60)
Processus.compteur_pid = 0
fifo = OrdonnanceurFIFO()
fifo.ajouter_processus(Processus("Firefox", 5))
fifo.ajouter_processus(Processus("VSCode", 3))
fifo.ajouter_processus(Processus("Spotify", 2))
fifo.executer_tous()
print("\n" + "="*60)
print("TEST 2 : ORDONNANCEUR ROUND ROBIN")
print("="*60)
Processus.compteur_pid = 0
rr = OrdonnanceurRoundRobin(quantum=2)
rr.ajouter_processus(Processus("Firefox", 6))
rr.ajouter_processus(Processus("VSCode", 4))
rr.ajouter_processus(Processus("Spotify", 2))
rr.executer_tous()
print("\n" + "="*60)
print("TEST 3 : SIMULATION D'INTERBLOCAGE")
print("="*60)
ressources = {
"A": Ressource("A"),
"B": Ressource("B")
}
Processus.compteur_pid = 0
p1 = ProcessusAvecRessources("Prog1", 5, ["A", "B"])
p2 = ProcessusAvecRessources("Prog2", 5, ["B", "A"])
print("\nÉtat initial des ressources:")
for r in ressources.values():
print(f" {r}")
print("\nTour 1: Prog1 demande ses ressources")
p1.demander_ressources(ressources)
print("\nTour 2: Prog2 demande ses ressources")
p2.demander_ressources(ressources)
print("\nTour 3: Prog1 continue")
p1.demander_ressources(ressources)
print("\nTour 4: Prog2 continue")
p2.demander_ressources(ressources)
detecter_interblocage([p1, p2], ressources)
print("\n" + "="*60)
print("TEST 4 : COMPARAISON DES ALGORITHMES")
print("="*60)
comparer_ordonnanceurs()