🎯 empfohlene Sammlungen
Balanced sample collections from various categories for you to explore
MQTT Nachrichtenprotokoll
MQTT (Message Queuing Telemetry Transport) Protokollbeispiele für IoT und Echtzeit-Nachrichtenübermittlung mit Publish/Subscribe-Patterns
💻 MQTT Grundkonzepte und Anwendung javascript
🟢 simple
⭐⭐
Einführung in das MQTT-Protokoll mit grundlegenden Publisher- und Subscriber-Beispielen
⏱️ 15 min
🏷️ mqtt, iot, messaging, publish-subscribe
Prerequisites:
Node.js basics, Understanding of pub/sub patterns
// MQTT Basic Examples using mqtt.js library
// npm install mqtt
const mqtt = require('mqtt');
// MQTT Broker connection options
const options = {
host: 'mqtt.eclipse.org', // Public MQTT broker
port: 1883,
protocol: 'mqtt',
keepalive: 60,
reconnectPeriod: 1000,
connectTimeout: 30 * 1000,
will: {
topic: 'clients/disconnect',
payload: 'Client disconnected unexpectedly',
qos: 1,
retain: false
}
};
// 1. Basic Publisher Example
function createPublisher() {
console.log('Creating MQTT Publisher...');
const client = mqtt.connect(options);
client.on('connect', function () {
console.log('Publisher connected to MQTT broker');
// Publish messages periodically
let counter = 0;
setInterval(() => {
const message = {
deviceId: 'sensor_001',
temperature: Math.random() * 30 + 20,
humidity: Math.random() * 50 + 30,
timestamp: new Date().toISOString(),
counter: counter++
};
const topic = 'sensors/temperature/room1';
const payload = JSON.stringify(message);
// Publish with QoS 1 (at least once delivery)
client.publish(topic, payload, { qos: 1, retain: false }, (error) => {
if (error) {
console.error('Failed to publish message:', error);
} else {
console.log(`Message published to ${topic}: ${payload}`);
}
});
}, 2000); // Publish every 2 seconds
});
client.on('error', function (error) {
console.error('MQTT connection error:', error);
});
client.on('offline', function () {
console.log('Publisher offline');
});
client.on('reconnect', function () {
console.log('Publisher reconnecting...');
});
return client;
}
// 2. Basic Subscriber Example
function createSubscriber() {
console.log('Creating MQTT Subscriber...');
const client = mqtt.connect(options);
client.on('connect', function () {
console.log('Subscriber connected to MQTT broker');
// Subscribe to topics
const topics = [
'sensors/temperature/+', // Wildcard for all rooms
'sensors/humidity/+',
'alerts/+/critical',
'clients/connect'
];
topics.forEach(topic => {
client.subscribe(topic, { qos: 1 }, (error, granted) => {
if (error) {
console.error(`Failed to subscribe to ${topic}:`, error);
} else {
console.log(`Subscribed to ${topic} with QoS ${granted[0].qos}`);
}
});
});
});
client.on('message', function (topic, message) {
try {
const payload = JSON.parse(message.toString());
console.log(`Received message from ${topic}:`, payload);
// Process different types of messages
if (topic.startsWith('sensors/temperature/')) {
handleTemperatureData(topic, payload);
} else if (topic.startsWith('alerts/')) {
handleAlert(topic, payload);
} else if (topic === 'clients/connect') {
handleClientConnect(payload);
}
} catch (error) {
console.log(`Received message from ${topic}: ${message.toString()}`);
}
});
client.on('error', function (error) {
console.error('MQTT connection error:', error);
});
return client;
}
// 3. Message Handlers
function handleTemperatureData(topic, data) {
const room = topic.split('/')[2];
console.log(`🌡️ Temperature in ${room}: ${data.temperature.toFixed(2)}°C`);
// Alert if temperature is too high
if (data.temperature > 40) {
console.log(`🚨 High temperature alert in ${room}!`);
}
}
function handleAlert(topic, alert) {
console.log(`🚨 ALERT: ${alert.message}`);
console.log(` Severity: ${alert.severity}`);
console.log(` Timestamp: ${alert.timestamp}`);
}
function handleClientConnect(data) {
console.log(`👤 New client connected: ${data.clientId}`);
}
// 4. MQTT Client with Last Will and Testament (LWT)
function createClientWithLWT(clientId) {
const clientOptions = {
...options,
clientId: clientId,
will: {
topic: `clients/${clientId}/status`,
payload: JSON.stringify({
status: 'offline',
timestamp: new Date().toISOString(),
reason: 'unexpected_disconnect'
}),
qos: 1,
retain: true
}
};
const client = mqtt.connect(clientOptions);
client.on('connect', function () {
console.log(`Client ${clientId} connected`);
// Publish online status
const statusMessage = {
status: 'online',
timestamp: new Date().toISOString()
};
client.publish(
`clients/${clientId}/status`,
JSON.stringify(statusMessage),
{ qos: 1, retain: true }
);
});
client.on('close', function () {
console.log(`Client ${clientId} disconnected gracefully`);
// Publish offline status
const statusMessage = {
status: 'offline',
timestamp: new Date().toISOString(),
reason: 'graceful_disconnect'
};
client.publish(
`clients/${clientId}/status`,
JSON.stringify(statusMessage),
{ qos: 1, retain: true }
);
});
return client;
}
// 5. Retained Messages Example
function demonstrateRetainedMessages() {
const client = mqtt.connect(options);
client.on('connect', function () {
console.log('Demonstrating retained messages...');
// Publish retained configuration data
const config = {
version: '1.0.0',
updateInterval: 5000,
maxRetries: 3,
features: ['temperature', 'humidity', 'pressure']
};
client.publish(
'config/sensor_001',
JSON.stringify(config),
{ qos: 1, retain: true },
() => {
console.log('Configuration published with retain flag');
console.log('New subscribers will immediately receive this message');
}
);
});
return client;
}
// 6. Quality of Service (QoS) Levels Example
function demonstrateQoS() {
const client = mqtt.connect(options);
client.on('connect', function () {
console.log('Demonstrating QoS levels...');
const messages = [
{ topic: 'qos/demo/0', payload: 'QoS 0 - At most once', qos: 0 },
{ topic: 'qos/demo/1', payload: 'QoS 1 - At least once', qos: 1 },
{ topic: 'qos/demo/2', payload: 'QoS 2 - Exactly once', qos: 2 }
];
messages.forEach(({ topic, payload, qos }) => {
client.publish(topic, payload, { qos }, (error) => {
if (!error) {
console.log(`Published with QoS ${qos}: ${payload}`);
}
});
});
});
return client;
}
// 7. Secure MQTT (MQTTS) Example
function createSecureClient() {
const secureOptions = {
host: 'mqtt.example.com',
port: 8883,
protocol: 'mqtts',
rejectUnauthorized: true,
ca: fs.readFileSync('./ca.crt'),
cert: fs.readFileSync('./client.crt'),
key: fs.readFileSync('./client.key'),
clientId: `secure_client_${Math.random().toString(16).substr(2, 8)}`
};
const client = mqtt.connect(secureOptions);
client.on('connect', function () {
console.log('Secure MQTT connection established');
});
return client;
}
// 8. Topic Patterns and Wildcards
function demonstrateTopicPatterns() {
const client = mqtt.connect(options);
client.on('connect', function () {
console.log('Demonstrating topic patterns...');
// Subscribe using different wildcard patterns
const subscriptions = [
{ topic: 'sensors/+/temperature', qos: 1 }, // Single-level wildcard
{ topic: 'devices/room1/#', qos: 1 }, // Multi-level wildcard
{ topic: 'alerts/+/critical', qos: 1 }, // Mixed pattern
{ topic: 'system/status', qos: 1 } // Exact match
];
subscriptions.forEach(({ topic, qos }) => {
client.subscribe(topic, { qos }, (error) => {
if (!error) {
console.log(`Subscribed to pattern: ${topic}`);
}
});
});
});
return client;
}
// Run examples
if (require.main === module) {
console.log('=== MQTT Examples ===\n');
// Start publisher
const publisher = createPublisher();
// Start subscriber
setTimeout(() => {
const subscriber = createSubscriber();
}, 1000);
// Demonstrate other features
setTimeout(() => {
demonstrateRetainedMessages();
demonstrateQoS();
}, 2000);
// Graceful shutdown
process.on('SIGINT', () => {
console.log('\nShutting down MQTT clients...');
publisher.end();
process.exit(0);
});
}
module.exports = {
createPublisher,
createSubscriber,
createClientWithLWT,
demonstrateRetainedMessages,
demonstrateQoS,
createSecureClient,
demonstrateTopicPatterns
};
💻 IoT-Sensor-Simulation mit MQTT javascript
🟡 intermediate
⭐⭐⭐
Vollständige IoT-Sensor-Simulation mit mehreren Gerätetypen und Echtzeit-Datenstreaming
⏱️ 25 min
🏷️ mqtt, iot, sensors, simulation, smart home
Prerequisites:
JavaScript, MQTT basics, Event-driven programming
// IoT Sensor Simulation with MQTT
// Simulates multiple types of IoT sensors publishing data via MQTT
const mqtt = require('mqtt');
const { EventEmitter } = require('events');
// MQTT Configuration
const MQTT_BROKER = 'mqtt.eclipse.org';
const MQTT_PORT = 1883;
// Base class for IoT sensors
class IoTSensor extends EventEmitter {
constructor(config) {
super();
this.id = config.id;
this.name = config.name;
this.location = config.location;
this.interval = config.interval || 5000;
this.client = null;
this.isRunning = false;
this.errorRate = config.errorRate || 0.05; // 5% error rate
}
connect() {
const options = {
host: MQTT_BROKER,
port: MQTT_PORT,
clientId: `sensor_${this.id}`,
will: {
topic: `devices/${this.id}/status`,
payload: JSON.stringify({
status: 'offline',
timestamp: new Date().toISOString(),
reason: 'unexpected_disconnect'
}),
qos: 1,
retain: true
}
};
this.client = mqtt.connect(options);
this.client.on('connect', () => {
console.log(`${this.constructor.name} ${this.name} connected`);
this.publishStatus('online');
this.emit('connected');
});
this.client.on('error', (error) => {
console.error(`${this.name} connection error:`, error);
this.emit('error', error);
});
this.client.on('offline', () => {
console.log(`${this.name} went offline`);
this.emit('offline');
});
this.client.on('reconnect', () => {
console.log(`${this.name} reconnecting...`);
});
return this.client;
}
start() {
if (!this.client) {
this.connect();
}
this.client.on('connect', () => {
if (!this.isRunning) {
this.isRunning = true;
this.startSimulation();
}
});
}
stop() {
this.isRunning = false;
if (this.client) {
this.publishStatus('offline', 'graceful_shutdown');
this.client.end();
}
}
publishStatus(status, reason = '') {
const statusMessage = {
deviceId: this.id,
deviceType: this.constructor.name,
location: this.location,
status: status,
timestamp: new Date().toISOString(),
reason: reason
};
this.client.publish(
`devices/${this.id}/status`,
JSON.stringify(statusMessage),
{ qos: 1, retain: true }
);
}
publishData(topic, data) {
const message = {
deviceId: this.id,
deviceType: this.constructor.name,
location: this.location,
timestamp: new Date().toISOString(),
...data
};
// Simulate occasional errors
if (Math.random() < this.errorRate) {
message.error = true;
message.errorMessage = 'Sensor read error';
}
this.client.publish(topic, JSON.stringify(message), { qos: 1 });
this.emit('data', message);
}
startSimulation() {
// Override in subclasses
console.log(`Starting simulation for ${this.name}`);
}
}
// Temperature Sensor
class TemperatureSensor extends IoTSensor {
constructor(config) {
super(config);
this.baseTemp = config.baseTemp || 22;
this.variance = config.variance || 5;
}
startSimulation() {
console.log(`🌡️ Starting temperature simulation for ${this.name}`);
this.simulationInterval = setInterval(() => {
if (!this.isRunning) return;
const temperature = this.baseTemp + (Math.random() - 0.5) * this.variance * 2;
const humidity = 40 + (Math.random() - 0.5) * 20;
this.publishData(`sensors/temperature/${this.location}`, {
temperature: parseFloat(temperature.toFixed(2)),
unit: 'celsius',
humidity: parseFloat(humidity.toFixed(2)),
unitHumidity: 'percent'
});
}, this.interval);
}
}
// Motion Sensor
class MotionSensor extends IoTSensor {
constructor(config) {
super(config);
this.sensitivity = config.sensitivity || 0.3;
this.lastMotion = null;
}
startSimulation() {
console.log(`🚪 Starting motion detection for ${this.name}`);
this.simulationInterval = setInterval(() => {
if (!this.isRunning) return;
const motionDetected = Math.random() < this.sensitivity;
if (motionDetected) {
this.lastMotion = new Date().toISOString();
this.publishData(`sensors/motion/${this.location}`, {
motion: true,
detectionTime: this.lastMotion,
confidence: Math.random()
});
}
}, this.interval);
}
}
// Light Sensor
class LightSensor extends IoTSensor {
constructor(config) {
super(config);
this.baseLux = config.baseLux || 300;
}
startSimulation() {
console.log(`💡 Starting light sensor for ${this.name}`);
this.simulationInterval = setInterval(() => {
if (!this.isRunning) return;
const hour = new Date().getHours();
let lux;
// Simulate day/night cycle
if (hour >= 6 && hour <= 18) {
lux = this.baseLux + Math.random() * 500;
} else {
lux = Math.random() * 50;
}
this.publishData(`sensors/light/${this.location}`, {
illuminance: parseFloat(lux.toFixed(2)),
unit: 'lux',
timeOfDay: hour >= 6 && hour <= 18 ? 'day' : 'night'
});
}, this.interval);
}
}
// Door/Window Sensor
class DoorSensor extends IoTSensor {
constructor(config) {
super(config);
this.isOpen = false;
this.changeProbability = config.changeProbability || 0.1;
}
startSimulation() {
console.log(`🚪 Starting door sensor for ${this.name}`);
this.simulationInterval = setInterval(() => {
if (!this.isRunning) return;
const shouldChange = Math.random() < this.changeProbability;
if (shouldChange) {
this.isOpen = !this.isOpen;
this.publishData(`sensors/door/${this.location}`, {
state: this.isOpen ? 'open' : 'closed',
lastChange: new Date().toISOString(),
batteryLevel: 80 + Math.random() * 20
});
}
}, this.interval);
}
}
// Smart Camera
class SmartCamera extends IoTSensor {
constructor(config) {
super(config);
this.detectionTypes = ['person', 'vehicle', 'animal', 'package'];
this.detectionRate = config.detectionRate || 0.2;
}
startSimulation() {
console.log(`📹 Starting smart camera for ${this.name}`);
this.simulationInterval = setInterval(() => {
if (!this.isRunning) return;
if (Math.random() < this.detectionRate) {
const detection = this.detectionTypes[Math.floor(Math.random() * this.detectionTypes.length)];
const confidence = 0.7 + Math.random() * 0.3;
this.publishData(`cameras/detection/${this.location}`, {
detectionType: detection,
confidence: parseFloat(confidence.toFixed(2)),
snapshotUrl: `https://example.com/snapshots/${this.id}/${Date.now()}.jpg`,
recording: Math.random() > 0.5
});
}
}, this.interval);
}
}
// Sensor Manager
class SensorManager {
constructor() {
this.sensors = [];
this.alertThresholds = new Map();
}
addSensor(sensor) {
this.sensors.push(sensor);
// Set up event listeners
sensor.on('data', (data) => {
this.processSensorData(data);
});
sensor.on('error', (error) => {
console.error(`Sensor ${sensor.id} error:`, error);
});
sensor.on('offline', () => {
console.log(`Sensor ${sensor.id} went offline`);
});
return sensor;
}
processSensorData(data) {
// Check for alerts
if (data.temperature && data.temperature > 35) {
this.sendAlert({
type: 'temperature_high',
deviceId: data.deviceId,
location: data.location,
value: data.temperature,
threshold: 35,
timestamp: data.timestamp
});
}
if (data.motion && data.motion) {
console.log(`🚨 Motion detected at ${data.location}`);
}
if (data.state === 'open' && data.deviceType === 'DoorSensor') {
this.sendAlert({
type: 'door_open',
deviceId: data.deviceId,
location: data.location,
timestamp: data.timestamp
});
}
}
sendAlert(alert) {
const alertMessage = {
id: `alert_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`,
...alert,
severity: alert.type.includes('temperature') ? 'warning' : 'info'
};
console.log(`🚨 ALERT: ${JSON.stringify(alertMessage, null, 2)}`);
// Publish alert to MQTT
const alertClient = mqtt.connect(`mqtt://${MQTT_BROKER}:${MQTT_PORT}`);
alertClient.on('connect', () => {
alertClient.publish(
`alerts/${alert.location}/${alert.type}`,
JSON.stringify(alertMessage),
{ qos: 1 }
);
alertClient.end();
});
}
startAll() {
console.log('\n🚀 Starting all sensors...');
this.sensors.forEach(sensor => sensor.start());
}
stopAll() {
console.log('\n🛑 Stopping all sensors...');
this.sensors.forEach(sensor => sensor.stop());
}
getStatus() {
return {
totalSensors: this.sensors.length,
onlineSensors: this.sensors.filter(s => s.isRunning).length,
sensorTypes: [...new Set(this.sensors.map(s => s.constructor.name))]
};
}
}
// Create and configure sensors
function createSmartHome() {
const manager = new SensorManager();
// Living room sensors
manager.addSensor(new TemperatureSensor({
id: 'temp_001',
name: 'Living Room Temp Sensor',
location: 'living_room',
interval: 5000,
baseTemp: 22,
variance: 3
}));
manager.addSensor(new MotionSensor({
id: 'motion_001',
name: 'Living Room Motion Sensor',
location: 'living_room',
interval: 2000,
sensitivity: 0.3
}));
manager.addSensor(new LightSensor({
id: 'light_001',
name: 'Living Room Light Sensor',
location: 'living_room',
interval: 3000,
baseLux: 400
}));
// Bedroom sensors
manager.addSensor(new TemperatureSensor({
id: 'temp_002',
name: 'Bedroom Temp Sensor',
location: 'bedroom',
interval: 6000,
baseTemp: 20,
variance: 2
}));
manager.addSensor(new DoorSensor({
id: 'door_001',
name: 'Bedroom Door Sensor',
location: 'bedroom',
interval: 4000,
changeProbability: 0.1
}));
// Front door
manager.addSensor(new DoorSensor({
id: 'door_002',
name: 'Front Door Sensor',
location: 'front_door',
interval: 2000,
changeProbability: 0.05
}));
// Outdoor camera
manager.addSensor(new SmartCamera({
id: 'cam_001',
name: 'Front Door Camera',
location: 'front_door',
interval: 5000,
detectionRate: 0.3
}));
return manager;
}
// Run the simulation
if (require.main === module) {
console.log('🏠 IoT Smart Home Simulation\n');
const smartHome = createSmartHome();
// Start simulation
smartHome.startAll();
// Display status every 10 seconds
setInterval(() => {
const status = smartHome.getStatus();
console.log(`\n📊 Status: ${status.onlineSensors}/${status.totalSensors} sensors online`);
console.log(` Types: ${status.sensorTypes.join(', ')}`);
}, 10000);
// Graceful shutdown
process.on('SIGINT', () => {
console.log('\n👋 Shutting down smart home simulation...');
smartHome.stopAll();
setTimeout(() => {
process.exit(0);
}, 2000);
});
}
module.exports = {
IoTSensor,
TemperatureSensor,
MotionSensor,
LightSensor,
DoorSensor,
SmartCamera,
SensorManager
};
💻 MQTT Python-Implementierung python
🟡 intermediate
⭐⭐⭐
Python MQTT-Client-Beispiele mit Paho-MQTT-Bibliothek für professionelle IoT-Anwendungen
⏱️ 30 min
🏷️ mqtt, python, iot, smart home, monitoring
Prerequisites:
Python, MQTT basics, Threading
# MQTT Python Examples
# pip install paho-mqtt
import paho.mqtt.client as mqtt
import json
import time
import threading
import logging
from datetime import datetime
from typing import Dict, List, Optional, Callable
import random
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# MQTT Configuration
MQTT_BROKER = "mqtt.eclipse.org"
MQTT_PORT = 1883
MQTT_KEEPALIVE = 60
class MQTTClient:
"""Advanced MQTT client with reconnection and message handling"""
def __init__(self, client_id: str = None, username: str = None, password: str = None):
self.client_id = client_id or f"python_client_{random.randint(1000, 9999)}"
self.username = username
self.password = password
self.client = mqtt.Client(client_id=self.client_id)
self.connected = False
self.message_callbacks = {}
self.subscription_topics = set()
self.last_will_topic = None
self.last_will_message = None
# Set up authentication
if username and password:
self.client.username_pw_set(username, password)
# Set up event handlers
self.client.on_connect = self._on_connect
self.client.on_disconnect = self._on_disconnect
self.client.on_message = self._on_message
self.client.on_publish = self._on_publish
self.client.on_subscribe = self._on_subscribe
self.client.on_unsubscribe = self._on_unsubscribe
def _on_connect(self, client, userdata, flags, rc):
"""Called when the client connects to the broker"""
if rc == 0:
self.connected = True
logger.info(f"Connected to MQTT broker with client ID: {self.client_id}")
# Resubscribe to all topics
for topic in self.subscription_topics:
self.subscribe(topic)
else:
logger.error(f"Failed to connect to MQTT broker. Return code: {rc}")
def _on_disconnect(self, client, userdata, rc):
"""Called when the client disconnects"""
self.connected = False
if rc != 0:
logger.warning(f"Unexpected disconnection from MQTT broker. Return code: {rc}")
else:
logger.info("Disconnected from MQTT broker")
def _on_message(self, client, userdata, msg):
"""Called when a message is received"""
try:
topic = msg.topic
payload = msg.payload.decode('utf-8')
# Try to parse as JSON first
try:
data = json.loads(payload)
logger.info(f"Received JSON message on {topic}: {data}")
except json.JSONDecodeError:
data = payload
logger.info(f"Received message on {topic}: {payload}")
# Call registered callback if exists
if topic in self.message_callbacks:
self.message_callbacks[topic](topic, data)
else:
# Check for wildcard callbacks
for registered_topic in self.message_callbacks:
if self._topic_matches(registered_topic, topic):
self.message_callbacks[registered_topic](topic, data)
break
except Exception as e:
logger.error(f"Error processing message: {e}")
def _on_publish(self, client, userdata, mid):
"""Called when a message is published"""
logger.debug(f"Message published with mid: {mid}")
def _on_subscribe(self, client, userdata, mid, granted_qos):
"""Called when the client subscribes to a topic"""
logger.info(f"Subscribed with mid: {mid}, QoS: {granted_qos}")
def _on_unsubscribe(self, client, userdata, mid):
"""Called when the client unsubscribes from a topic"""
logger.info(f"Unsubscribed with mid: {mid}")
def _topic_matches(self, pattern: str, topic: str) -> bool:
"""Check if a topic matches a pattern (supports wildcards)"""
pattern_parts = pattern.split('/')
topic_parts = topic.split('/')
if len(pattern_parts) != len(topic_parts):
if pattern_parts[-1] != '#':
return False
for i, (pattern_part, topic_part) in enumerate(zip(pattern_parts, topic_parts)):
if pattern_part == '+':
continue
elif pattern_part == '#':
return True
elif pattern_part != topic_part:
return False
return True
def set_last_will(self, topic: str, message: str, qos: int = 1, retain: bool = False):
"""Set the Last Will and Testament message"""
self.last_will_topic = topic
self.last_will_message = message
self.client.will_set(topic, message.encode('utf-8'), qos, retain)
def connect(self, broker: str = MQTT_BROKER, port: int = MQTT_PORT,
keepalive: int = MQTT_KEEPALIVE) -> bool:
"""Connect to the MQTT broker"""
try:
self.client.connect(broker, port, keepalive)
self.client.loop_start()
# Wait for connection
timeout = 10 # seconds
start_time = time.time()
while not self.connected and time.time() - start_time < timeout:
time.sleep(0.1)
return self.connected
except Exception as e:
logger.error(f"Failed to connect to MQTT broker: {e}")
return False
def disconnect(self):
"""Disconnect from the MQTT broker"""
if self.connected:
self.client.loop_stop()
self.client.disconnect()
def subscribe(self, topic: str, qos: int = 1, callback: Callable = None) -> bool:
"""Subscribe to a topic"""
try:
result, mid = self.client.subscribe(topic, qos)
if result == mqtt.MQTT_ERR_SUCCESS:
self.subscription_topics.add(topic)
if callback:
self.message_callbacks[topic] = callback
logger.info(f"Subscribed to topic: {topic}")
return True
else:
logger.error(f"Failed to subscribe to topic: {topic}")
return False
except Exception as e:
logger.error(f"Error subscribing to topic: {e}")
return False
def unsubscribe(self, topic: str) -> bool:
"""Unsubscribe from a topic"""
try:
result, mid = self.client.unsubscribe(topic)
if result == mqtt.MQTT_ERR_SUCCESS:
self.subscription_topics.discard(topic)
if topic in self.message_callbacks:
del self.message_callbacks[topic]
logger.info(f"Unsubscribed from topic: {topic}")
return True
else:
logger.error(f"Failed to unsubscribe from topic: {topic}")
return False
except Exception as e:
logger.error(f"Error unsubscribing from topic: {e}")
return False
def publish(self, topic: str, payload, qos: int = 1, retain: bool = False) -> bool:
"""Publish a message to a topic"""
try:
if isinstance(payload, dict) or isinstance(payload, list):
payload = json.dumps(payload)
result = self.client.publish(topic, payload.encode('utf-8'), qos, retain)
if result.rc == mqtt.MQTT_ERR_SUCCESS:
logger.info(f"Published to {topic}: {payload}")
return True
else:
logger.error(f"Failed to publish to {topic}")
return False
except Exception as e:
logger.error(f"Error publishing message: {e}")
return False
def add_callback(self, topic: str, callback: Callable):
"""Add a callback for a specific topic"""
self.message_callbacks[topic] = callback
# Example 1: Temperature Monitor
class TemperatureMonitor:
"""Monitors temperature sensors and alerts on thresholds"""
def __init__(self, client: MQTTClient):
self.client = client
self.temperature_readings = {}
self.alert_threshold = 30.0 # Celsius
self.low_threshold = 10.0
# Subscribe to temperature topics
self.client.subscribe("sensors/temperature/+", qos=1, callback=self.handle_temperature)
def handle_temperature(self, topic: str, data: dict):
"""Handle incoming temperature data"""
try:
location = topic.split('/')[-1]
temperature = float(data.get('temperature', 0))
timestamp = data.get('timestamp', datetime.now().isoformat())
# Store reading
self.temperature_readings[location] = {
'temperature': temperature,
'timestamp': timestamp
}
# Check thresholds
if temperature > self.alert_threshold:
self.send_alert(location, temperature, "HIGH")
elif temperature < self.low_threshold:
self.send_alert(location, temperature, "LOW")
else:
logger.info(f"🌡️ {location}: {temperature}°C - Normal")
except Exception as e:
logger.error(f"Error handling temperature data: {e}")
def send_alert(self, location: str, temperature: float, alert_type: str):
"""Send temperature alert"""
alert = {
'type': 'temperature_alert',
'alert_type': alert_type,
'location': location,
'temperature': temperature,
'threshold': self.alert_threshold if alert_type == "HIGH" else self.low_threshold,
'timestamp': datetime.now().isoformat()
}
self.client.publish(f"alerts/{location}/temperature", alert)
logger.warning(f"🚨 Temperature alert - {location}: {temperature}°C ({alert_type})")
# Example 2: Smart Home Controller
class SmartHomeController:
"""Controls smart home devices based on sensor data"""
def __init__(self, client: MQTTClient):
self.client = client
self.devices = {
'living_room_light': False,
'bedroom_light': False,
'thermostat': 22,
'security_system': False
}
# Subscribe to sensor and control topics
self.client.subscribe("sensors/motion/+", callback=self.handle_motion)
self.client.subscribe("sensors/light/+", callback=self.handle_light)
self.client.subscribe("control/+", callback=self.handle_control)
self.client.subscribe("devices/+/status", callback=self.handle_device_status)
def handle_motion(self, topic: str, data: dict):
"""Handle motion sensor data"""
location = topic.split('/')[-1]
if data.get('motion', False):
logger.info(f"🚶 Motion detected in {location}")
# Turn on lights when motion is detected
if location == 'living_room':
self.toggle_device('living_room_light', True)
elif location == 'bedroom':
self.toggle_device('bedroom_light', True)
# Activate security if outside business hours
hour = datetime.now().hour
if hour < 6 or hour > 22:
self.toggle_device('security_system', True)
def handle_light(self, topic: str, data: dict):
"""Handle light sensor data"""
location = topic.split('/')[-1]
illuminance = float(data.get('illuminance', 0))
# Automatic lighting based on ambient light
if illuminance < 100 and location == 'living_room':
self.toggle_device('living_room_light', True)
logger.info(f"💡 Low light detected in {location}, turning on lights")
elif illuminance > 300 and location == 'living_room':
self.toggle_device('living_room_light', False)
logger.info(f"☀️ Sufficient light in {location}, turning off lights")
def handle_control(self, topic: str, data: dict):
"""Handle control commands"""
try:
command = data.get('command')
device = data.get('device')
value = data.get('value')
if command == 'toggle':
self.toggle_device(device, value)
elif command == 'set_thermostat':
self.set_thermostat(value)
elif command == 'security':
self.toggle_device('security_system', value)
logger.info(f"🎮 Control command received: {command} for {device}")
except Exception as e:
logger.error(f"Error handling control command: {e}")
def handle_device_status(self, topic: str, data: dict):
"""Handle device status updates"""
device_id = topic.split('/')[1]
status = data.get('status')
if status == 'offline':
logger.warning(f"⚠️ Device {device_id} is offline")
# Could trigger alert or attempt reconnection here
def toggle_device(self, device: str, state: bool):
"""Toggle a device on/off"""
if device in self.devices:
self.devices[device] = state
# Publish device state change
message = {
'device': device,
'state': state,
'timestamp': datetime.now().isoformat()
}
self.client.publish(f"devices/{device}/state", message)
logger.info(f"🔌 {device} turned {'on' if state else 'off'}")
def set_thermostat(self, temperature: float):
"""Set thermostat temperature"""
self.devices['thermostat'] = temperature
message = {
'device': 'thermostat',
'temperature': temperature,
'timestamp': datetime.now().isoformat()
}
self.client.publish("devices/thermostat/setpoint", message)
logger.info(f"🌡️ Thermostat set to {temperature}°C")
# Example 3: Data Logger
class MQTTDataLogger:
"""Logs MQTT data to file or database"""
def __init__(self, client: MQTTClient, log_file: str = "mqtt_data.log"):
self.client = client
self.log_file = log_file
# Subscribe to all sensor data
self.client.subscribe("sensors/+", callback=self.log_sensor_data)
self.client.subscribe("devices/+/state", callback=self.log_device_state)
def log_sensor_data(self, topic: str, data):
"""Log sensor data"""
log_entry = {
'timestamp': datetime.now().isoformat(),
'topic': topic,
'type': 'sensor_data',
'data': data
}
self._write_log(log_entry)
def log_device_state(self, topic: str, data):
"""Log device state changes"""
log_entry = {
'timestamp': datetime.now().isoformat(),
'topic': topic,
'type': 'device_state',
'data': data
}
self._write_log(log_entry)
def _write_log(self, log_entry: dict):
"""Write log entry to file"""
try:
with open(self.log_file, 'a') as f:
f.write(json.dumps(log_entry) + '
')
except Exception as e:
logger.error(f"Error writing to log file: {e}")
# Main application
def main():
logger.info("🚀 Starting MQTT Python Application")
# Create MQTT client with Last Will
client = MQTTClient(client_id="python_controller")
client.set_last_will(
"controllers/python_controller/status",
json.dumps({
'status': 'offline',
'timestamp': datetime.now().isoformat(),
'reason': 'unexpected_disconnect'
}),
qos=1,
retain=True
)
# Connect to broker
if not client.connect():
logger.error("Failed to connect to MQTT broker")
return
# Publish online status
client.publish(
"controllers/python_controller/status",
json.dumps({
'status': 'online',
'timestamp': datetime.now().isoformat()
}),
retain=True
)
# Initialize components
temperature_monitor = TemperatureMonitor(client)
smart_home_controller = SmartHomeController(client)
data_logger = MQTTDataLogger(client)
# Simulate some device updates
def simulate_devices():
while True:
# Simulate temperature sensor
temp_data = {
'temperature': round(random.uniform(15, 35), 2),
'timestamp': datetime.now().isoformat()
}
client.publish("sensors/temperature/living_room", temp_data)
# Simulate motion sensor
if random.random() < 0.3:
motion_data = {
'motion': True,
'timestamp': datetime.now().isoformat()
}
client.publish("sensors/motion/living_room", motion_data)
time.sleep(5)
# Start simulation in background thread
simulation_thread = threading.Thread(target=simulate_devices, daemon=True)
simulation_thread.start()
try:
logger.info("✅ Application running. Press Ctrl+C to stop.")
while True:
time.sleep(1)
except KeyboardInterrupt:
logger.info("👋 Shutting down application...")
# Publish offline status
client.publish(
"controllers/python_controller/status",
json.dumps({
'status': 'offline',
'timestamp': datetime.now().isoformat(),
'reason': 'graceful_shutdown'
}),
retain=True
)
client.disconnect()
logger.info("🔌 Application stopped")
if __name__ == "__main__":
main()