474 lines
13 KiB
Markdown
474 lines
13 KiB
Markdown
# TP : Station Météo IoT — Simulation d'un système embarqué
|
|
|
|
## Contexte
|
|
|
|
Vous travaillez pour une startup spécialisée dans les objets connectés. Votre mission est de développer le logiciel embarqué d'une **station météo connectée** basée sur un SoC (System on Chip).
|
|
|
|
Cette station doit :
|
|
- Collecter des données de capteurs (température, humidité, pression)
|
|
- Traiter ces données localement (calculs de moyennes, détection d'anomalies)
|
|
- Simuler l'envoi vers le cloud
|
|
- Optimiser la consommation énergétique
|
|
|
|
L'objectif est de comprendre comment fonctionne un système embarqué sur SoC et les contraintes associées.
|
|
|
|
---
|
|
|
|
## Objectifs pédagogiques
|
|
|
|
- Comprendre le fonctionnement d'un système embarqué
|
|
- Manipuler des données de capteurs simulés
|
|
- Implémenter des algorithmes de traitement de données
|
|
- Appréhender les contraintes de ressources limitées
|
|
- Découvrir les concepts de l'IoT
|
|
|
|
---
|
|
|
|
## Partie 1 : Simulation des capteurs
|
|
|
|
### 1.1. Classe Capteur
|
|
|
|
Un capteur embarqué a des caractéristiques spécifiques :
|
|
- Un nom (température, humidité, etc.)
|
|
- Une unité de mesure
|
|
- Une plage de valeurs valides
|
|
- Une précision (nombre de décimales)
|
|
|
|
Créer la classe `Capteur` :
|
|
|
|
```python
|
|
import random
|
|
import time
|
|
|
|
class Capteur:
|
|
"""Simule un capteur embarqué sur SoC."""
|
|
|
|
def __init__(self, nom, unite, val_min, val_max, precision=1):
|
|
"""
|
|
Initialise un capteur.
|
|
|
|
:param nom: (str) Nom du capteur
|
|
:param unite: (str) Unité de mesure
|
|
:param val_min: (float) Valeur minimale possible
|
|
:param val_max: (float) Valeur maximale possible
|
|
:param precision: (int) Nombre de décimales
|
|
"""
|
|
self.nom = nom
|
|
self.unite = unite
|
|
self.val_min = val_min
|
|
self.val_max = val_max
|
|
self.precision = precision
|
|
self._derniere_valeur = None
|
|
|
|
def lire(self):
|
|
"""
|
|
Simule une lecture du capteur.
|
|
Retourne une valeur réaliste (proche de la précédente).
|
|
|
|
:return: (float) Valeur mesurée
|
|
"""
|
|
# À compléter
|
|
pass
|
|
|
|
def __repr__(self):
|
|
return f"Capteur({self.nom}, {self.unite})"
|
|
```
|
|
|
|
**Consignes pour la méthode `lire()` :**
|
|
- Si c'est la première lecture, générer une valeur aléatoire dans la plage
|
|
- Sinon, générer une valeur proche de la précédente (±10% de la plage)
|
|
- Arrondir selon la précision
|
|
- S'assurer que la valeur reste dans les limites
|
|
|
|
### 1.2. Création des capteurs de la station
|
|
|
|
Créer trois capteurs pour notre station :
|
|
|
|
```python
|
|
# Capteur de température : -20°C à 50°C, précision 0.1°C
|
|
capteur_temp = Capteur("Température", "°C", -20, 50, precision=1)
|
|
|
|
# Capteur d'humidité : 0% à 100%, précision 1%
|
|
capteur_hum = Capteur("Humidité", "%", 0, 100, precision=0)
|
|
|
|
# Capteur de pression : 950 hPa à 1050 hPa, précision 0.1 hPa
|
|
capteur_pres = Capteur("Pression", "hPa", 950, 1050, precision=1)
|
|
```
|
|
|
|
**Test :**
|
|
```python
|
|
>>> for _ in range(5):
|
|
... print(f"T={capteur_temp.lire()}°C, H={capteur_hum.lire()}%, P={capteur_pres.lire()}hPa")
|
|
T=22.3°C, H=65%, P=1013.2hPa
|
|
T=23.1°C, H=67%, P=1012.8hPa
|
|
T=22.8°C, H=66%, P=1013.5hPa
|
|
...
|
|
```
|
|
|
|
---
|
|
|
|
## Partie 2 : Gestionnaire de données
|
|
|
|
### 2.1. Buffer circulaire
|
|
|
|
Les systèmes embarqués ont une mémoire limitée. On utilise un **buffer circulaire** pour stocker les N dernières mesures sans allouer de mémoire dynamiquement.
|
|
|
|
```python
|
|
class BufferCirculaire:
|
|
"""
|
|
Buffer circulaire pour stocker les dernières mesures.
|
|
Mémoire fixe, adapté aux systèmes embarqués.
|
|
"""
|
|
|
|
def __init__(self, taille):
|
|
"""
|
|
Initialise le buffer.
|
|
|
|
:param taille: (int) Nombre maximum d'éléments
|
|
"""
|
|
self.taille = taille
|
|
self.donnees = [None] * taille # Pré-allocation
|
|
self.index = 0
|
|
self.compte = 0
|
|
|
|
def ajouter(self, valeur):
|
|
"""
|
|
Ajoute une valeur au buffer.
|
|
Écrase la plus ancienne si plein.
|
|
|
|
:param valeur: La valeur à ajouter
|
|
"""
|
|
# À compléter
|
|
pass
|
|
|
|
def moyenne(self):
|
|
"""
|
|
Calcule la moyenne des valeurs stockées.
|
|
|
|
:return: (float) Moyenne ou None si vide
|
|
"""
|
|
# À compléter
|
|
pass
|
|
|
|
def min_max(self):
|
|
"""
|
|
Retourne le minimum et le maximum.
|
|
|
|
:return: (tuple) (min, max) ou (None, None) si vide
|
|
"""
|
|
# À compléter
|
|
pass
|
|
|
|
def est_plein(self):
|
|
"""Vérifie si le buffer est plein."""
|
|
return self.compte >= self.taille
|
|
|
|
def __len__(self):
|
|
return self.compte
|
|
```
|
|
|
|
**Principe du buffer circulaire :**
|
|
```
|
|
Taille = 5, après ajout de 1, 2, 3, 4, 5, 6, 7 :
|
|
|
|
Étape 1-5 : [1, 2, 3, 4, 5] (remplissage normal)
|
|
Étape 6 : [6, 2, 3, 4, 5] (6 écrase 1)
|
|
Étape 7 : [6, 7, 3, 4, 5] (7 écrase 2)
|
|
|
|
L'index "tourne" : index = (index + 1) % taille
|
|
```
|
|
|
|
---
|
|
|
|
## Partie 3 : Station météo complète
|
|
|
|
### 3.1. Classe StationMeteo
|
|
|
|
Assembler les composants pour créer la station :
|
|
|
|
```python
|
|
class StationMeteo:
|
|
"""
|
|
Station météo IoT simulant un système sur SoC.
|
|
"""
|
|
|
|
def __init__(self, nom, taille_buffer=10):
|
|
"""
|
|
Initialise la station.
|
|
|
|
:param nom: (str) Identifiant de la station
|
|
:param taille_buffer: (int) Taille des buffers de données
|
|
"""
|
|
self.nom = nom
|
|
|
|
# Capteurs
|
|
self.capteurs = {
|
|
'temperature': Capteur("Température", "°C", -20, 50, 1),
|
|
'humidite': Capteur("Humidité", "%", 0, 100, 0),
|
|
'pression': Capteur("Pression", "hPa", 950, 1050, 1)
|
|
}
|
|
|
|
# Buffers pour chaque capteur
|
|
self.buffers = {
|
|
nom: BufferCirculaire(taille_buffer)
|
|
for nom in self.capteurs
|
|
}
|
|
|
|
# Compteur de mesures
|
|
self.nb_mesures = 0
|
|
|
|
# Mode économie d'énergie
|
|
self.mode_eco = False
|
|
|
|
def mesurer(self):
|
|
"""
|
|
Effectue une mesure sur tous les capteurs.
|
|
|
|
:return: (dict) Dictionnaire des mesures
|
|
"""
|
|
# À compléter
|
|
pass
|
|
|
|
def statistiques(self):
|
|
"""
|
|
Calcule les statistiques sur les données collectées.
|
|
|
|
:return: (dict) Statistiques par capteur
|
|
"""
|
|
# À compléter
|
|
pass
|
|
|
|
def detecter_anomalies(self):
|
|
"""
|
|
Détecte les valeurs anormales.
|
|
Une anomalie est une valeur qui s'écarte de plus de 2 écarts-types
|
|
de la moyenne.
|
|
|
|
:return: (list) Liste des anomalies détectées
|
|
"""
|
|
# À compléter (bonus)
|
|
pass
|
|
|
|
def rapport(self):
|
|
"""
|
|
Génère un rapport formaté.
|
|
|
|
:return: (str) Rapport textuel
|
|
"""
|
|
# À compléter
|
|
pass
|
|
```
|
|
|
|
### 3.2. Simulation d'envoi cloud
|
|
|
|
Dans un vrai système IoT, les données sont envoyées vers le cloud. Simuler cette fonctionnalité :
|
|
|
|
```python
|
|
def envoyer_cloud(self, donnees):
|
|
"""
|
|
Simule l'envoi de données vers le cloud.
|
|
En mode éco, regroupe les envois.
|
|
|
|
:param donnees: (dict) Données à envoyer
|
|
:return: (bool) Succès de l'envoi
|
|
"""
|
|
# Simuler une latence réseau
|
|
import time
|
|
time.sleep(0.1) # 100ms de latence simulée
|
|
|
|
# Simuler un échec occasionnel (5% de chance)
|
|
if random.random() < 0.05:
|
|
print(f"[{self.nom}] Échec d'envoi - Nouvelle tentative...")
|
|
return False
|
|
|
|
# Afficher les données "envoyées"
|
|
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
|
|
print(f"[{self.nom}] Envoi cloud @ {timestamp}")
|
|
for cle, valeur in donnees.items():
|
|
print(f" {cle}: {valeur}")
|
|
|
|
return True
|
|
```
|
|
|
|
---
|
|
|
|
## Partie 4 : Gestion de l'énergie
|
|
|
|
### 4.1. Mode économie d'énergie
|
|
|
|
Les systèmes embarqués doivent économiser l'énergie. Implémenter un mode économie :
|
|
|
|
```python
|
|
def activer_mode_eco(self):
|
|
"""
|
|
Active le mode économie d'énergie.
|
|
- Réduit la fréquence des mesures
|
|
- Regroupe les envois cloud
|
|
"""
|
|
self.mode_eco = True
|
|
print(f"[{self.nom}] Mode économie activé")
|
|
|
|
def desactiver_mode_eco(self):
|
|
"""Désactive le mode économie d'énergie."""
|
|
self.mode_eco = False
|
|
print(f"[{self.nom}] Mode économie désactivé")
|
|
```
|
|
|
|
### 4.2. Simulation de consommation
|
|
|
|
Ajouter un suivi de la consommation énergétique simulée :
|
|
|
|
```python
|
|
class StationMeteo:
|
|
# Constantes de consommation (en mW)
|
|
CONSO_VEILLE = 0.5
|
|
CONSO_MESURE = 10
|
|
CONSO_ENVOI = 50
|
|
CONSO_CALCUL = 5
|
|
|
|
def __init__(self, ...):
|
|
...
|
|
self.energie_totale = 0 # mWh consommés
|
|
|
|
def _consommer(self, activite, duree_ms):
|
|
"""
|
|
Enregistre la consommation d'énergie.
|
|
|
|
:param activite: (str) Type d'activité
|
|
:param duree_ms: (int) Durée en millisecondes
|
|
"""
|
|
conso = {
|
|
'veille': self.CONSO_VEILLE,
|
|
'mesure': self.CONSO_MESURE,
|
|
'envoi': self.CONSO_ENVOI,
|
|
'calcul': self.CONSO_CALCUL
|
|
}
|
|
# Convertir en mWh : mW * (ms / 3600000)
|
|
self.energie_totale += conso[activite] * (duree_ms / 3600000)
|
|
```
|
|
|
|
---
|
|
|
|
## Partie 5 : Programme principal
|
|
|
|
### 5.1. Boucle de fonctionnement
|
|
|
|
Créer le programme principal qui simule le fonctionnement de la station :
|
|
|
|
```python
|
|
def main():
|
|
"""Programme principal de la station météo."""
|
|
|
|
# Créer la station
|
|
station = StationMeteo("STATION_01", taille_buffer=20)
|
|
|
|
print("=" * 50)
|
|
print(" STATION MÉTÉO IoT - Simulation SoC")
|
|
print("=" * 50)
|
|
|
|
# Simuler 1 heure de fonctionnement (1 mesure par minute = 60 mesures)
|
|
nb_mesures = 60
|
|
intervalle = 1 # Secondes entre les mesures (accéléré pour la simulation)
|
|
|
|
try:
|
|
for i in range(nb_mesures):
|
|
# Effectuer une mesure
|
|
mesure = station.mesurer()
|
|
print(f"\nMesure {i+1}/{nb_mesures}: T={mesure['temperature']}°C, "
|
|
f"H={mesure['humidite']}%, P={mesure['pression']}hPa")
|
|
|
|
# Envoyer au cloud toutes les 10 mesures
|
|
if (i + 1) % 10 == 0:
|
|
stats = station.statistiques()
|
|
station.envoyer_cloud(stats)
|
|
|
|
# Attendre avant la prochaine mesure
|
|
time.sleep(intervalle)
|
|
|
|
except KeyboardInterrupt:
|
|
print("\n\nArrêt demandé par l'utilisateur.")
|
|
|
|
# Rapport final
|
|
print("\n" + "=" * 50)
|
|
print("RAPPORT FINAL")
|
|
print("=" * 50)
|
|
print(station.rapport())
|
|
print(f"Énergie consommée : {station.energie_totale:.4f} mWh")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
```
|
|
|
|
---
|
|
|
|
## Partie 6 : Extensions (Bonus)
|
|
|
|
### 6.1. Détection d'anomalies
|
|
|
|
Implémenter la méthode `detecter_anomalies()` qui identifie les valeurs aberrantes.
|
|
|
|
### 6.2. Persistance locale
|
|
|
|
Ajouter une fonctionnalité de sauvegarde locale (fichier JSON) pour conserver les données en cas de perte de connexion :
|
|
|
|
```python
|
|
def sauvegarder_local(self, fichier="backup.json"):
|
|
"""Sauvegarde les données en local (simulation de mémoire flash)."""
|
|
pass
|
|
|
|
def charger_local(self, fichier="backup.json"):
|
|
"""Charge les données depuis la sauvegarde locale."""
|
|
pass
|
|
```
|
|
|
|
### 6.3. Protocole de communication
|
|
|
|
Simuler un protocole de communication simple (type MQTT) :
|
|
|
|
```python
|
|
def publier(self, topic, message):
|
|
"""Publie un message sur un topic (simulation MQTT)."""
|
|
print(f"[MQTT] {topic} -> {message}")
|
|
```
|
|
|
|
### 6.4. Multi-stations
|
|
|
|
Créer un réseau de plusieurs stations qui communiquent entre elles et agrègent leurs données.
|
|
|
|
---
|
|
|
|
## Questions de synthèse
|
|
|
|
1. Pourquoi utilise-t-on un buffer circulaire plutôt qu'une liste dynamique sur un système embarqué ?
|
|
|
|
2. Quels sont les compromis à faire entre fréquence de mesure et consommation énergétique ?
|
|
|
|
3. Comment un vrai système IoT gère-t-il la perte de connexion réseau ?
|
|
|
|
4. Quelles sont les différences entre le code Python de simulation et le code réel sur un microcontrôleur ?
|
|
|
|
5. Comment l'architecture SoC permet-elle d'optimiser la consommation dans ce type d'application ?
|
|
|
|
---
|
|
|
|
## Barème indicatif
|
|
|
|
| Partie | Points |
|
|
|--------|--------|
|
|
| Partie 1 : Capteurs | 3 |
|
|
| Partie 2 : Buffer circulaire | 4 |
|
|
| Partie 3 : Station complète | 5 |
|
|
| Partie 4 : Gestion énergie | 3 |
|
|
| Partie 5 : Programme principal | 3 |
|
|
| Partie 6 : Bonus | 2 |
|
|
| **Total** | **20** |
|
|
|
|
---
|
|
|
|
Auteur : Florian Mathieu
|
|
|
|
Licence CC BY NC
|
|
|
|
<a rel="license" href="http://creativecommons.org/licenses/by-nc-sa/4.0/"><img alt="Licence Creative Commons" style="border-width:0" src="https://i.creativecommons.org/l/by-nc-sa/4.0/88x31.png" /></a> <br />Ce cours est mis à disposition selon les termes de la <a rel="license" href="http://creativecommons.org/licenses/by-nc-sa/4.0/">Licence Creative Commons Attribution - Pas d'Utilisation Commerciale - Partage dans les Mêmes Conditions 4.0 International</a>.
|