🎯 Exemples recommandés
Balanced sample collections from various categories for you to explore
Protocole de Messages MQTT
Exemples du protocole MQTT (Message Queuing Telemetry Transport) pour IoT et messagerie en temps réel avec des modèles publication/abonnement
💻 Concepts et Utilisation de Base MQTT javascript
🟢 simple
⭐⭐
Introduction au protocole MQTT avec des exemples de base de publisher et subscriber
⏱️ 15 min
🏷️ mqtt, iot, messaging, publish-subscribe
Prerequisites:
Node.js basics, Understanding of pub/sub patterns
// MQTT Basic Examples using mqtt.js library
// npm install mqtt
const mqtt = require('mqtt');
// MQTT Broker connection options
const options = {
host: 'mqtt.eclipse.org', // Public MQTT broker
port: 1883,
protocol: 'mqtt',
keepalive: 60,
reconnectPeriod: 1000,
connectTimeout: 30 * 1000,
will: {
topic: 'clients/disconnect',
payload: 'Client disconnected unexpectedly',
qos: 1,
retain: false
}
};
// 1. Basic Publisher Example
function createPublisher() {
console.log('Creating MQTT Publisher...');
const client = mqtt.connect(options);
client.on('connect', function () {
console.log('Publisher connected to MQTT broker');
// Publish messages periodically
let counter = 0;
setInterval(() => {
const message = {
deviceId: 'sensor_001',
temperature: Math.random() * 30 + 20,
humidity: Math.random() * 50 + 30,
timestamp: new Date().toISOString(),
counter: counter++
};
const topic = 'sensors/temperature/room1';
const payload = JSON.stringify(message);
// Publish with QoS 1 (at least once delivery)
client.publish(topic, payload, { qos: 1, retain: false }, (error) => {
if (error) {
console.error('Failed to publish message:', error);
} else {
console.log(`Message published to ${topic}: ${payload}`);
}
});
}, 2000); // Publish every 2 seconds
});
client.on('error', function (error) {
console.error('MQTT connection error:', error);
});
client.on('offline', function () {
console.log('Publisher offline');
});
client.on('reconnect', function () {
console.log('Publisher reconnecting...');
});
return client;
}
// 2. Basic Subscriber Example
function createSubscriber() {
console.log('Creating MQTT Subscriber...');
const client = mqtt.connect(options);
client.on('connect', function () {
console.log('Subscriber connected to MQTT broker');
// Subscribe to topics
const topics = [
'sensors/temperature/+', // Wildcard for all rooms
'sensors/humidity/+',
'alerts/+/critical',
'clients/connect'
];
topics.forEach(topic => {
client.subscribe(topic, { qos: 1 }, (error, granted) => {
if (error) {
console.error(`Failed to subscribe to ${topic}:`, error);
} else {
console.log(`Subscribed to ${topic} with QoS ${granted[0].qos}`);
}
});
});
});
client.on('message', function (topic, message) {
try {
const payload = JSON.parse(message.toString());
console.log(`Received message from ${topic}:`, payload);
// Process different types of messages
if (topic.startsWith('sensors/temperature/')) {
handleTemperatureData(topic, payload);
} else if (topic.startsWith('alerts/')) {
handleAlert(topic, payload);
} else if (topic === 'clients/connect') {
handleClientConnect(payload);
}
} catch (error) {
console.log(`Received message from ${topic}: ${message.toString()}`);
}
});
client.on('error', function (error) {
console.error('MQTT connection error:', error);
});
return client;
}
// 3. Message Handlers
function handleTemperatureData(topic, data) {
const room = topic.split('/')[2];
console.log(`🌡️ Temperature in ${room}: ${data.temperature.toFixed(2)}°C`);
// Alert if temperature is too high
if (data.temperature > 40) {
console.log(`🚨 High temperature alert in ${room}!`);
}
}
function handleAlert(topic, alert) {
console.log(`🚨 ALERT: ${alert.message}`);
console.log(` Severity: ${alert.severity}`);
console.log(` Timestamp: ${alert.timestamp}`);
}
function handleClientConnect(data) {
console.log(`👤 New client connected: ${data.clientId}`);
}
// 4. MQTT Client with Last Will and Testament (LWT)
function createClientWithLWT(clientId) {
const clientOptions = {
...options,
clientId: clientId,
will: {
topic: `clients/${clientId}/status`,
payload: JSON.stringify({
status: 'offline',
timestamp: new Date().toISOString(),
reason: 'unexpected_disconnect'
}),
qos: 1,
retain: true
}
};
const client = mqtt.connect(clientOptions);
client.on('connect', function () {
console.log(`Client ${clientId} connected`);
// Publish online status
const statusMessage = {
status: 'online',
timestamp: new Date().toISOString()
};
client.publish(
`clients/${clientId}/status`,
JSON.stringify(statusMessage),
{ qos: 1, retain: true }
);
});
client.on('close', function () {
console.log(`Client ${clientId} disconnected gracefully`);
// Publish offline status
const statusMessage = {
status: 'offline',
timestamp: new Date().toISOString(),
reason: 'graceful_disconnect'
};
client.publish(
`clients/${clientId}/status`,
JSON.stringify(statusMessage),
{ qos: 1, retain: true }
);
});
return client;
}
// 5. Retained Messages Example
function demonstrateRetainedMessages() {
const client = mqtt.connect(options);
client.on('connect', function () {
console.log('Demonstrating retained messages...');
// Publish retained configuration data
const config = {
version: '1.0.0',
updateInterval: 5000,
maxRetries: 3,
features: ['temperature', 'humidity', 'pressure']
};
client.publish(
'config/sensor_001',
JSON.stringify(config),
{ qos: 1, retain: true },
() => {
console.log('Configuration published with retain flag');
console.log('New subscribers will immediately receive this message');
}
);
});
return client;
}
// 6. Quality of Service (QoS) Levels Example
function demonstrateQoS() {
const client = mqtt.connect(options);
client.on('connect', function () {
console.log('Demonstrating QoS levels...');
const messages = [
{ topic: 'qos/demo/0', payload: 'QoS 0 - At most once', qos: 0 },
{ topic: 'qos/demo/1', payload: 'QoS 1 - At least once', qos: 1 },
{ topic: 'qos/demo/2', payload: 'QoS 2 - Exactly once', qos: 2 }
];
messages.forEach(({ topic, payload, qos }) => {
client.publish(topic, payload, { qos }, (error) => {
if (!error) {
console.log(`Published with QoS ${qos}: ${payload}`);
}
});
});
});
return client;
}
// 7. Secure MQTT (MQTTS) Example
function createSecureClient() {
const secureOptions = {
host: 'mqtt.example.com',
port: 8883,
protocol: 'mqtts',
rejectUnauthorized: true,
ca: fs.readFileSync('./ca.crt'),
cert: fs.readFileSync('./client.crt'),
key: fs.readFileSync('./client.key'),
clientId: `secure_client_${Math.random().toString(16).substr(2, 8)}`
};
const client = mqtt.connect(secureOptions);
client.on('connect', function () {
console.log('Secure MQTT connection established');
});
return client;
}
// 8. Topic Patterns and Wildcards
function demonstrateTopicPatterns() {
const client = mqtt.connect(options);
client.on('connect', function () {
console.log('Demonstrating topic patterns...');
// Subscribe using different wildcard patterns
const subscriptions = [
{ topic: 'sensors/+/temperature', qos: 1 }, // Single-level wildcard
{ topic: 'devices/room1/#', qos: 1 }, // Multi-level wildcard
{ topic: 'alerts/+/critical', qos: 1 }, // Mixed pattern
{ topic: 'system/status', qos: 1 } // Exact match
];
subscriptions.forEach(({ topic, qos }) => {
client.subscribe(topic, { qos }, (error) => {
if (!error) {
console.log(`Subscribed to pattern: ${topic}`);
}
});
});
});
return client;
}
// Run examples
if (require.main === module) {
console.log('=== MQTT Examples ===\n');
// Start publisher
const publisher = createPublisher();
// Start subscriber
setTimeout(() => {
const subscriber = createSubscriber();
}, 1000);
// Demonstrate other features
setTimeout(() => {
demonstrateRetainedMessages();
demonstrateQoS();
}, 2000);
// Graceful shutdown
process.on('SIGINT', () => {
console.log('\nShutting down MQTT clients...');
publisher.end();
process.exit(0);
});
}
module.exports = {
createPublisher,
createSubscriber,
createClientWithLWT,
demonstrateRetainedMessages,
demonstrateQoS,
createSecureClient,
demonstrateTopicPatterns
};
💻 Simulation de Capteurs IoT avec MQTT javascript
🟡 intermediate
⭐⭐⭐
Simulation complète de capteurs IoT avec plusieurs types d'appareils et streaming de données en temps réel
⏱️ 25 min
🏷️ mqtt, iot, sensors, simulation, smart home
Prerequisites:
JavaScript, MQTT basics, Event-driven programming
// IoT Sensor Simulation with MQTT
// Simulates multiple types of IoT sensors publishing data via MQTT
const mqtt = require('mqtt');
const { EventEmitter } = require('events');
// MQTT Configuration
const MQTT_BROKER = 'mqtt.eclipse.org';
const MQTT_PORT = 1883;
// Base class for IoT sensors
class IoTSensor extends EventEmitter {
constructor(config) {
super();
this.id = config.id;
this.name = config.name;
this.location = config.location;
this.interval = config.interval || 5000;
this.client = null;
this.isRunning = false;
this.errorRate = config.errorRate || 0.05; // 5% error rate
}
connect() {
const options = {
host: MQTT_BROKER,
port: MQTT_PORT,
clientId: `sensor_${this.id}`,
will: {
topic: `devices/${this.id}/status`,
payload: JSON.stringify({
status: 'offline',
timestamp: new Date().toISOString(),
reason: 'unexpected_disconnect'
}),
qos: 1,
retain: true
}
};
this.client = mqtt.connect(options);
this.client.on('connect', () => {
console.log(`${this.constructor.name} ${this.name} connected`);
this.publishStatus('online');
this.emit('connected');
});
this.client.on('error', (error) => {
console.error(`${this.name} connection error:`, error);
this.emit('error', error);
});
this.client.on('offline', () => {
console.log(`${this.name} went offline`);
this.emit('offline');
});
this.client.on('reconnect', () => {
console.log(`${this.name} reconnecting...`);
});
return this.client;
}
start() {
if (!this.client) {
this.connect();
}
this.client.on('connect', () => {
if (!this.isRunning) {
this.isRunning = true;
this.startSimulation();
}
});
}
stop() {
this.isRunning = false;
if (this.client) {
this.publishStatus('offline', 'graceful_shutdown');
this.client.end();
}
}
publishStatus(status, reason = '') {
const statusMessage = {
deviceId: this.id,
deviceType: this.constructor.name,
location: this.location,
status: status,
timestamp: new Date().toISOString(),
reason: reason
};
this.client.publish(
`devices/${this.id}/status`,
JSON.stringify(statusMessage),
{ qos: 1, retain: true }
);
}
publishData(topic, data) {
const message = {
deviceId: this.id,
deviceType: this.constructor.name,
location: this.location,
timestamp: new Date().toISOString(),
...data
};
// Simulate occasional errors
if (Math.random() < this.errorRate) {
message.error = true;
message.errorMessage = 'Sensor read error';
}
this.client.publish(topic, JSON.stringify(message), { qos: 1 });
this.emit('data', message);
}
startSimulation() {
// Override in subclasses
console.log(`Starting simulation for ${this.name}`);
}
}
// Temperature Sensor
class TemperatureSensor extends IoTSensor {
constructor(config) {
super(config);
this.baseTemp = config.baseTemp || 22;
this.variance = config.variance || 5;
}
startSimulation() {
console.log(`🌡️ Starting temperature simulation for ${this.name}`);
this.simulationInterval = setInterval(() => {
if (!this.isRunning) return;
const temperature = this.baseTemp + (Math.random() - 0.5) * this.variance * 2;
const humidity = 40 + (Math.random() - 0.5) * 20;
this.publishData(`sensors/temperature/${this.location}`, {
temperature: parseFloat(temperature.toFixed(2)),
unit: 'celsius',
humidity: parseFloat(humidity.toFixed(2)),
unitHumidity: 'percent'
});
}, this.interval);
}
}
// Motion Sensor
class MotionSensor extends IoTSensor {
constructor(config) {
super(config);
this.sensitivity = config.sensitivity || 0.3;
this.lastMotion = null;
}
startSimulation() {
console.log(`🚪 Starting motion detection for ${this.name}`);
this.simulationInterval = setInterval(() => {
if (!this.isRunning) return;
const motionDetected = Math.random() < this.sensitivity;
if (motionDetected) {
this.lastMotion = new Date().toISOString();
this.publishData(`sensors/motion/${this.location}`, {
motion: true,
detectionTime: this.lastMotion,
confidence: Math.random()
});
}
}, this.interval);
}
}
// Light Sensor
class LightSensor extends IoTSensor {
constructor(config) {
super(config);
this.baseLux = config.baseLux || 300;
}
startSimulation() {
console.log(`💡 Starting light sensor for ${this.name}`);
this.simulationInterval = setInterval(() => {
if (!this.isRunning) return;
const hour = new Date().getHours();
let lux;
// Simulate day/night cycle
if (hour >= 6 && hour <= 18) {
lux = this.baseLux + Math.random() * 500;
} else {
lux = Math.random() * 50;
}
this.publishData(`sensors/light/${this.location}`, {
illuminance: parseFloat(lux.toFixed(2)),
unit: 'lux',
timeOfDay: hour >= 6 && hour <= 18 ? 'day' : 'night'
});
}, this.interval);
}
}
// Door/Window Sensor
class DoorSensor extends IoTSensor {
constructor(config) {
super(config);
this.isOpen = false;
this.changeProbability = config.changeProbability || 0.1;
}
startSimulation() {
console.log(`🚪 Starting door sensor for ${this.name}`);
this.simulationInterval = setInterval(() => {
if (!this.isRunning) return;
const shouldChange = Math.random() < this.changeProbability;
if (shouldChange) {
this.isOpen = !this.isOpen;
this.publishData(`sensors/door/${this.location}`, {
state: this.isOpen ? 'open' : 'closed',
lastChange: new Date().toISOString(),
batteryLevel: 80 + Math.random() * 20
});
}
}, this.interval);
}
}
// Smart Camera
class SmartCamera extends IoTSensor {
constructor(config) {
super(config);
this.detectionTypes = ['person', 'vehicle', 'animal', 'package'];
this.detectionRate = config.detectionRate || 0.2;
}
startSimulation() {
console.log(`📹 Starting smart camera for ${this.name}`);
this.simulationInterval = setInterval(() => {
if (!this.isRunning) return;
if (Math.random() < this.detectionRate) {
const detection = this.detectionTypes[Math.floor(Math.random() * this.detectionTypes.length)];
const confidence = 0.7 + Math.random() * 0.3;
this.publishData(`cameras/detection/${this.location}`, {
detectionType: detection,
confidence: parseFloat(confidence.toFixed(2)),
snapshotUrl: `https://example.com/snapshots/${this.id}/${Date.now()}.jpg`,
recording: Math.random() > 0.5
});
}
}, this.interval);
}
}
// Sensor Manager
class SensorManager {
constructor() {
this.sensors = [];
this.alertThresholds = new Map();
}
addSensor(sensor) {
this.sensors.push(sensor);
// Set up event listeners
sensor.on('data', (data) => {
this.processSensorData(data);
});
sensor.on('error', (error) => {
console.error(`Sensor ${sensor.id} error:`, error);
});
sensor.on('offline', () => {
console.log(`Sensor ${sensor.id} went offline`);
});
return sensor;
}
processSensorData(data) {
// Check for alerts
if (data.temperature && data.temperature > 35) {
this.sendAlert({
type: 'temperature_high',
deviceId: data.deviceId,
location: data.location,
value: data.temperature,
threshold: 35,
timestamp: data.timestamp
});
}
if (data.motion && data.motion) {
console.log(`🚨 Motion detected at ${data.location}`);
}
if (data.state === 'open' && data.deviceType === 'DoorSensor') {
this.sendAlert({
type: 'door_open',
deviceId: data.deviceId,
location: data.location,
timestamp: data.timestamp
});
}
}
sendAlert(alert) {
const alertMessage = {
id: `alert_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`,
...alert,
severity: alert.type.includes('temperature') ? 'warning' : 'info'
};
console.log(`🚨 ALERT: ${JSON.stringify(alertMessage, null, 2)}`);
// Publish alert to MQTT
const alertClient = mqtt.connect(`mqtt://${MQTT_BROKER}:${MQTT_PORT}`);
alertClient.on('connect', () => {
alertClient.publish(
`alerts/${alert.location}/${alert.type}`,
JSON.stringify(alertMessage),
{ qos: 1 }
);
alertClient.end();
});
}
startAll() {
console.log('\n🚀 Starting all sensors...');
this.sensors.forEach(sensor => sensor.start());
}
stopAll() {
console.log('\n🛑 Stopping all sensors...');
this.sensors.forEach(sensor => sensor.stop());
}
getStatus() {
return {
totalSensors: this.sensors.length,
onlineSensors: this.sensors.filter(s => s.isRunning).length,
sensorTypes: [...new Set(this.sensors.map(s => s.constructor.name))]
};
}
}
// Create and configure sensors
function createSmartHome() {
const manager = new SensorManager();
// Living room sensors
manager.addSensor(new TemperatureSensor({
id: 'temp_001',
name: 'Living Room Temp Sensor',
location: 'living_room',
interval: 5000,
baseTemp: 22,
variance: 3
}));
manager.addSensor(new MotionSensor({
id: 'motion_001',
name: 'Living Room Motion Sensor',
location: 'living_room',
interval: 2000,
sensitivity: 0.3
}));
manager.addSensor(new LightSensor({
id: 'light_001',
name: 'Living Room Light Sensor',
location: 'living_room',
interval: 3000,
baseLux: 400
}));
// Bedroom sensors
manager.addSensor(new TemperatureSensor({
id: 'temp_002',
name: 'Bedroom Temp Sensor',
location: 'bedroom',
interval: 6000,
baseTemp: 20,
variance: 2
}));
manager.addSensor(new DoorSensor({
id: 'door_001',
name: 'Bedroom Door Sensor',
location: 'bedroom',
interval: 4000,
changeProbability: 0.1
}));
// Front door
manager.addSensor(new DoorSensor({
id: 'door_002',
name: 'Front Door Sensor',
location: 'front_door',
interval: 2000,
changeProbability: 0.05
}));
// Outdoor camera
manager.addSensor(new SmartCamera({
id: 'cam_001',
name: 'Front Door Camera',
location: 'front_door',
interval: 5000,
detectionRate: 0.3
}));
return manager;
}
// Run the simulation
if (require.main === module) {
console.log('🏠 IoT Smart Home Simulation\n');
const smartHome = createSmartHome();
// Start simulation
smartHome.startAll();
// Display status every 10 seconds
setInterval(() => {
const status = smartHome.getStatus();
console.log(`\n📊 Status: ${status.onlineSensors}/${status.totalSensors} sensors online`);
console.log(` Types: ${status.sensorTypes.join(', ')}`);
}, 10000);
// Graceful shutdown
process.on('SIGINT', () => {
console.log('\n👋 Shutting down smart home simulation...');
smartHome.stopAll();
setTimeout(() => {
process.exit(0);
}, 2000);
});
}
module.exports = {
IoTSensor,
TemperatureSensor,
MotionSensor,
LightSensor,
DoorSensor,
SmartCamera,
SensorManager
};
💻 Implémentation MQTT Python python
🟡 intermediate
⭐⭐⭐
Exemples de client MQTT Python utilisant la librairie Paho-MQTT pour applications IoT professionnelles
⏱️ 30 min
🏷️ mqtt, python, iot, smart home, monitoring
Prerequisites:
Python, MQTT basics, Threading
# MQTT Python Examples
# pip install paho-mqtt
import paho.mqtt.client as mqtt
import json
import time
import threading
import logging
from datetime import datetime
from typing import Dict, List, Optional, Callable
import random
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# MQTT Configuration
MQTT_BROKER = "mqtt.eclipse.org"
MQTT_PORT = 1883
MQTT_KEEPALIVE = 60
class MQTTClient:
"""Advanced MQTT client with reconnection and message handling"""
def __init__(self, client_id: str = None, username: str = None, password: str = None):
self.client_id = client_id or f"python_client_{random.randint(1000, 9999)}"
self.username = username
self.password = password
self.client = mqtt.Client(client_id=self.client_id)
self.connected = False
self.message_callbacks = {}
self.subscription_topics = set()
self.last_will_topic = None
self.last_will_message = None
# Set up authentication
if username and password:
self.client.username_pw_set(username, password)
# Set up event handlers
self.client.on_connect = self._on_connect
self.client.on_disconnect = self._on_disconnect
self.client.on_message = self._on_message
self.client.on_publish = self._on_publish
self.client.on_subscribe = self._on_subscribe
self.client.on_unsubscribe = self._on_unsubscribe
def _on_connect(self, client, userdata, flags, rc):
"""Called when the client connects to the broker"""
if rc == 0:
self.connected = True
logger.info(f"Connected to MQTT broker with client ID: {self.client_id}")
# Resubscribe to all topics
for topic in self.subscription_topics:
self.subscribe(topic)
else:
logger.error(f"Failed to connect to MQTT broker. Return code: {rc}")
def _on_disconnect(self, client, userdata, rc):
"""Called when the client disconnects"""
self.connected = False
if rc != 0:
logger.warning(f"Unexpected disconnection from MQTT broker. Return code: {rc}")
else:
logger.info("Disconnected from MQTT broker")
def _on_message(self, client, userdata, msg):
"""Called when a message is received"""
try:
topic = msg.topic
payload = msg.payload.decode('utf-8')
# Try to parse as JSON first
try:
data = json.loads(payload)
logger.info(f"Received JSON message on {topic}: {data}")
except json.JSONDecodeError:
data = payload
logger.info(f"Received message on {topic}: {payload}")
# Call registered callback if exists
if topic in self.message_callbacks:
self.message_callbacks[topic](topic, data)
else:
# Check for wildcard callbacks
for registered_topic in self.message_callbacks:
if self._topic_matches(registered_topic, topic):
self.message_callbacks[registered_topic](topic, data)
break
except Exception as e:
logger.error(f"Error processing message: {e}")
def _on_publish(self, client, userdata, mid):
"""Called when a message is published"""
logger.debug(f"Message published with mid: {mid}")
def _on_subscribe(self, client, userdata, mid, granted_qos):
"""Called when the client subscribes to a topic"""
logger.info(f"Subscribed with mid: {mid}, QoS: {granted_qos}")
def _on_unsubscribe(self, client, userdata, mid):
"""Called when the client unsubscribes from a topic"""
logger.info(f"Unsubscribed with mid: {mid}")
def _topic_matches(self, pattern: str, topic: str) -> bool:
"""Check if a topic matches a pattern (supports wildcards)"""
pattern_parts = pattern.split('/')
topic_parts = topic.split('/')
if len(pattern_parts) != len(topic_parts):
if pattern_parts[-1] != '#':
return False
for i, (pattern_part, topic_part) in enumerate(zip(pattern_parts, topic_parts)):
if pattern_part == '+':
continue
elif pattern_part == '#':
return True
elif pattern_part != topic_part:
return False
return True
def set_last_will(self, topic: str, message: str, qos: int = 1, retain: bool = False):
"""Set the Last Will and Testament message"""
self.last_will_topic = topic
self.last_will_message = message
self.client.will_set(topic, message.encode('utf-8'), qos, retain)
def connect(self, broker: str = MQTT_BROKER, port: int = MQTT_PORT,
keepalive: int = MQTT_KEEPALIVE) -> bool:
"""Connect to the MQTT broker"""
try:
self.client.connect(broker, port, keepalive)
self.client.loop_start()
# Wait for connection
timeout = 10 # seconds
start_time = time.time()
while not self.connected and time.time() - start_time < timeout:
time.sleep(0.1)
return self.connected
except Exception as e:
logger.error(f"Failed to connect to MQTT broker: {e}")
return False
def disconnect(self):
"""Disconnect from the MQTT broker"""
if self.connected:
self.client.loop_stop()
self.client.disconnect()
def subscribe(self, topic: str, qos: int = 1, callback: Callable = None) -> bool:
"""Subscribe to a topic"""
try:
result, mid = self.client.subscribe(topic, qos)
if result == mqtt.MQTT_ERR_SUCCESS:
self.subscription_topics.add(topic)
if callback:
self.message_callbacks[topic] = callback
logger.info(f"Subscribed to topic: {topic}")
return True
else:
logger.error(f"Failed to subscribe to topic: {topic}")
return False
except Exception as e:
logger.error(f"Error subscribing to topic: {e}")
return False
def unsubscribe(self, topic: str) -> bool:
"""Unsubscribe from a topic"""
try:
result, mid = self.client.unsubscribe(topic)
if result == mqtt.MQTT_ERR_SUCCESS:
self.subscription_topics.discard(topic)
if topic in self.message_callbacks:
del self.message_callbacks[topic]
logger.info(f"Unsubscribed from topic: {topic}")
return True
else:
logger.error(f"Failed to unsubscribe from topic: {topic}")
return False
except Exception as e:
logger.error(f"Error unsubscribing from topic: {e}")
return False
def publish(self, topic: str, payload, qos: int = 1, retain: bool = False) -> bool:
"""Publish a message to a topic"""
try:
if isinstance(payload, dict) or isinstance(payload, list):
payload = json.dumps(payload)
result = self.client.publish(topic, payload.encode('utf-8'), qos, retain)
if result.rc == mqtt.MQTT_ERR_SUCCESS:
logger.info(f"Published to {topic}: {payload}")
return True
else:
logger.error(f"Failed to publish to {topic}")
return False
except Exception as e:
logger.error(f"Error publishing message: {e}")
return False
def add_callback(self, topic: str, callback: Callable):
"""Add a callback for a specific topic"""
self.message_callbacks[topic] = callback
# Example 1: Temperature Monitor
class TemperatureMonitor:
"""Monitors temperature sensors and alerts on thresholds"""
def __init__(self, client: MQTTClient):
self.client = client
self.temperature_readings = {}
self.alert_threshold = 30.0 # Celsius
self.low_threshold = 10.0
# Subscribe to temperature topics
self.client.subscribe("sensors/temperature/+", qos=1, callback=self.handle_temperature)
def handle_temperature(self, topic: str, data: dict):
"""Handle incoming temperature data"""
try:
location = topic.split('/')[-1]
temperature = float(data.get('temperature', 0))
timestamp = data.get('timestamp', datetime.now().isoformat())
# Store reading
self.temperature_readings[location] = {
'temperature': temperature,
'timestamp': timestamp
}
# Check thresholds
if temperature > self.alert_threshold:
self.send_alert(location, temperature, "HIGH")
elif temperature < self.low_threshold:
self.send_alert(location, temperature, "LOW")
else:
logger.info(f"🌡️ {location}: {temperature}°C - Normal")
except Exception as e:
logger.error(f"Error handling temperature data: {e}")
def send_alert(self, location: str, temperature: float, alert_type: str):
"""Send temperature alert"""
alert = {
'type': 'temperature_alert',
'alert_type': alert_type,
'location': location,
'temperature': temperature,
'threshold': self.alert_threshold if alert_type == "HIGH" else self.low_threshold,
'timestamp': datetime.now().isoformat()
}
self.client.publish(f"alerts/{location}/temperature", alert)
logger.warning(f"🚨 Temperature alert - {location}: {temperature}°C ({alert_type})")
# Example 2: Smart Home Controller
class SmartHomeController:
"""Controls smart home devices based on sensor data"""
def __init__(self, client: MQTTClient):
self.client = client
self.devices = {
'living_room_light': False,
'bedroom_light': False,
'thermostat': 22,
'security_system': False
}
# Subscribe to sensor and control topics
self.client.subscribe("sensors/motion/+", callback=self.handle_motion)
self.client.subscribe("sensors/light/+", callback=self.handle_light)
self.client.subscribe("control/+", callback=self.handle_control)
self.client.subscribe("devices/+/status", callback=self.handle_device_status)
def handle_motion(self, topic: str, data: dict):
"""Handle motion sensor data"""
location = topic.split('/')[-1]
if data.get('motion', False):
logger.info(f"🚶 Motion detected in {location}")
# Turn on lights when motion is detected
if location == 'living_room':
self.toggle_device('living_room_light', True)
elif location == 'bedroom':
self.toggle_device('bedroom_light', True)
# Activate security if outside business hours
hour = datetime.now().hour
if hour < 6 or hour > 22:
self.toggle_device('security_system', True)
def handle_light(self, topic: str, data: dict):
"""Handle light sensor data"""
location = topic.split('/')[-1]
illuminance = float(data.get('illuminance', 0))
# Automatic lighting based on ambient light
if illuminance < 100 and location == 'living_room':
self.toggle_device('living_room_light', True)
logger.info(f"💡 Low light detected in {location}, turning on lights")
elif illuminance > 300 and location == 'living_room':
self.toggle_device('living_room_light', False)
logger.info(f"☀️ Sufficient light in {location}, turning off lights")
def handle_control(self, topic: str, data: dict):
"""Handle control commands"""
try:
command = data.get('command')
device = data.get('device')
value = data.get('value')
if command == 'toggle':
self.toggle_device(device, value)
elif command == 'set_thermostat':
self.set_thermostat(value)
elif command == 'security':
self.toggle_device('security_system', value)
logger.info(f"🎮 Control command received: {command} for {device}")
except Exception as e:
logger.error(f"Error handling control command: {e}")
def handle_device_status(self, topic: str, data: dict):
"""Handle device status updates"""
device_id = topic.split('/')[1]
status = data.get('status')
if status == 'offline':
logger.warning(f"⚠️ Device {device_id} is offline")
# Could trigger alert or attempt reconnection here
def toggle_device(self, device: str, state: bool):
"""Toggle a device on/off"""
if device in self.devices:
self.devices[device] = state
# Publish device state change
message = {
'device': device,
'state': state,
'timestamp': datetime.now().isoformat()
}
self.client.publish(f"devices/{device}/state", message)
logger.info(f"🔌 {device} turned {'on' if state else 'off'}")
def set_thermostat(self, temperature: float):
"""Set thermostat temperature"""
self.devices['thermostat'] = temperature
message = {
'device': 'thermostat',
'temperature': temperature,
'timestamp': datetime.now().isoformat()
}
self.client.publish("devices/thermostat/setpoint", message)
logger.info(f"🌡️ Thermostat set to {temperature}°C")
# Example 3: Data Logger
class MQTTDataLogger:
"""Logs MQTT data to file or database"""
def __init__(self, client: MQTTClient, log_file: str = "mqtt_data.log"):
self.client = client
self.log_file = log_file
# Subscribe to all sensor data
self.client.subscribe("sensors/+", callback=self.log_sensor_data)
self.client.subscribe("devices/+/state", callback=self.log_device_state)
def log_sensor_data(self, topic: str, data):
"""Log sensor data"""
log_entry = {
'timestamp': datetime.now().isoformat(),
'topic': topic,
'type': 'sensor_data',
'data': data
}
self._write_log(log_entry)
def log_device_state(self, topic: str, data):
"""Log device state changes"""
log_entry = {
'timestamp': datetime.now().isoformat(),
'topic': topic,
'type': 'device_state',
'data': data
}
self._write_log(log_entry)
def _write_log(self, log_entry: dict):
"""Write log entry to file"""
try:
with open(self.log_file, 'a') as f:
f.write(json.dumps(log_entry) + '
')
except Exception as e:
logger.error(f"Error writing to log file: {e}")
# Main application
def main():
logger.info("🚀 Starting MQTT Python Application")
# Create MQTT client with Last Will
client = MQTTClient(client_id="python_controller")
client.set_last_will(
"controllers/python_controller/status",
json.dumps({
'status': 'offline',
'timestamp': datetime.now().isoformat(),
'reason': 'unexpected_disconnect'
}),
qos=1,
retain=True
)
# Connect to broker
if not client.connect():
logger.error("Failed to connect to MQTT broker")
return
# Publish online status
client.publish(
"controllers/python_controller/status",
json.dumps({
'status': 'online',
'timestamp': datetime.now().isoformat()
}),
retain=True
)
# Initialize components
temperature_monitor = TemperatureMonitor(client)
smart_home_controller = SmartHomeController(client)
data_logger = MQTTDataLogger(client)
# Simulate some device updates
def simulate_devices():
while True:
# Simulate temperature sensor
temp_data = {
'temperature': round(random.uniform(15, 35), 2),
'timestamp': datetime.now().isoformat()
}
client.publish("sensors/temperature/living_room", temp_data)
# Simulate motion sensor
if random.random() < 0.3:
motion_data = {
'motion': True,
'timestamp': datetime.now().isoformat()
}
client.publish("sensors/motion/living_room", motion_data)
time.sleep(5)
# Start simulation in background thread
simulation_thread = threading.Thread(target=simulate_devices, daemon=True)
simulation_thread.start()
try:
logger.info("✅ Application running. Press Ctrl+C to stop.")
while True:
time.sleep(1)
except KeyboardInterrupt:
logger.info("👋 Shutting down application...")
# Publish offline status
client.publish(
"controllers/python_controller/status",
json.dumps({
'status': 'offline',
'timestamp': datetime.now().isoformat(),
'reason': 'graceful_shutdown'
}),
retain=True
)
client.disconnect()
logger.info("🔌 Application stopped")
if __name__ == "__main__":
main()