MQTT Nachrichtenprotokoll

MQTT (Message Queuing Telemetry Transport) Protokollbeispiele für IoT und Echtzeit-Nachrichtenübermittlung mit Publish/Subscribe-Patterns

💻 MQTT Grundkonzepte und Anwendung javascript

🟢 simple ⭐⭐

Einführung in das MQTT-Protokoll mit grundlegenden Publisher- und Subscriber-Beispielen

⏱️ 15 min 🏷️ mqtt, iot, messaging, publish-subscribe
Prerequisites: Node.js basics, Understanding of pub/sub patterns
// MQTT Basic Examples using mqtt.js library
// npm install mqtt

const mqtt = require('mqtt');

// MQTT Broker connection options
const options = {
  host: 'mqtt.eclipse.org',  // Public MQTT broker
  port: 1883,
  protocol: 'mqtt',
  keepalive: 60,
  reconnectPeriod: 1000,
  connectTimeout: 30 * 1000,
  will: {
    topic: 'clients/disconnect',
    payload: 'Client disconnected unexpectedly',
    qos: 1,
    retain: false
  }
};

// 1. Basic Publisher Example
function createPublisher() {
  console.log('Creating MQTT Publisher...');

  const client = mqtt.connect(options);

  client.on('connect', function () {
    console.log('Publisher connected to MQTT broker');

    // Publish messages periodically
    let counter = 0;
    setInterval(() => {
      const message = {
        deviceId: 'sensor_001',
        temperature: Math.random() * 30 + 20,
        humidity: Math.random() * 50 + 30,
        timestamp: new Date().toISOString(),
        counter: counter++
      };

      const topic = 'sensors/temperature/room1';
      const payload = JSON.stringify(message);

      // Publish with QoS 1 (at least once delivery)
      client.publish(topic, payload, { qos: 1, retain: false }, (error) => {
        if (error) {
          console.error('Failed to publish message:', error);
        } else {
          console.log(`Message published to ${topic}: ${payload}`);
        }
      });
    }, 2000); // Publish every 2 seconds
  });

  client.on('error', function (error) {
    console.error('MQTT connection error:', error);
  });

  client.on('offline', function () {
    console.log('Publisher offline');
  });

  client.on('reconnect', function () {
    console.log('Publisher reconnecting...');
  });

  return client;
}

// 2. Basic Subscriber Example
function createSubscriber() {
  console.log('Creating MQTT Subscriber...');

  const client = mqtt.connect(options);

  client.on('connect', function () {
    console.log('Subscriber connected to MQTT broker');

    // Subscribe to topics
    const topics = [
      'sensors/temperature/+',  // Wildcard for all rooms
      'sensors/humidity/+',
      'alerts/+/critical',
      'clients/connect'
    ];

    topics.forEach(topic => {
      client.subscribe(topic, { qos: 1 }, (error, granted) => {
        if (error) {
          console.error(`Failed to subscribe to ${topic}:`, error);
        } else {
          console.log(`Subscribed to ${topic} with QoS ${granted[0].qos}`);
        }
      });
    });
  });

  client.on('message', function (topic, message) {
    try {
      const payload = JSON.parse(message.toString());
      console.log(`Received message from ${topic}:`, payload);

      // Process different types of messages
      if (topic.startsWith('sensors/temperature/')) {
        handleTemperatureData(topic, payload);
      } else if (topic.startsWith('alerts/')) {
        handleAlert(topic, payload);
      } else if (topic === 'clients/connect') {
        handleClientConnect(payload);
      }
    } catch (error) {
      console.log(`Received message from ${topic}: ${message.toString()}`);
    }
  });

  client.on('error', function (error) {
    console.error('MQTT connection error:', error);
  });

  return client;
}

// 3. Message Handlers
function handleTemperatureData(topic, data) {
  const room = topic.split('/')[2];
  console.log(`🌡️  Temperature in ${room}: ${data.temperature.toFixed(2)}°C`);

  // Alert if temperature is too high
  if (data.temperature > 40) {
    console.log(`🚨 High temperature alert in ${room}!`);
  }
}

function handleAlert(topic, alert) {
  console.log(`🚨 ALERT: ${alert.message}`);
  console.log(`   Severity: ${alert.severity}`);
  console.log(`   Timestamp: ${alert.timestamp}`);
}

function handleClientConnect(data) {
  console.log(`👤 New client connected: ${data.clientId}`);
}

// 4. MQTT Client with Last Will and Testament (LWT)
function createClientWithLWT(clientId) {
  const clientOptions = {
    ...options,
    clientId: clientId,
    will: {
      topic: `clients/${clientId}/status`,
      payload: JSON.stringify({
        status: 'offline',
        timestamp: new Date().toISOString(),
        reason: 'unexpected_disconnect'
      }),
      qos: 1,
      retain: true
    }
  };

  const client = mqtt.connect(clientOptions);

  client.on('connect', function () {
    console.log(`Client ${clientId} connected`);

    // Publish online status
    const statusMessage = {
      status: 'online',
      timestamp: new Date().toISOString()
    };

    client.publish(
      `clients/${clientId}/status`,
      JSON.stringify(statusMessage),
      { qos: 1, retain: true }
    );
  });

  client.on('close', function () {
    console.log(`Client ${clientId} disconnected gracefully`);

    // Publish offline status
    const statusMessage = {
      status: 'offline',
      timestamp: new Date().toISOString(),
      reason: 'graceful_disconnect'
    };

    client.publish(
      `clients/${clientId}/status`,
      JSON.stringify(statusMessage),
      { qos: 1, retain: true }
    );
  });

  return client;
}

// 5. Retained Messages Example
function demonstrateRetainedMessages() {
  const client = mqtt.connect(options);

  client.on('connect', function () {
    console.log('Demonstrating retained messages...');

    // Publish retained configuration data
    const config = {
      version: '1.0.0',
      updateInterval: 5000,
      maxRetries: 3,
      features: ['temperature', 'humidity', 'pressure']
    };

    client.publish(
      'config/sensor_001',
      JSON.stringify(config),
      { qos: 1, retain: true },
      () => {
        console.log('Configuration published with retain flag');
        console.log('New subscribers will immediately receive this message');
      }
    );
  });

  return client;
}

// 6. Quality of Service (QoS) Levels Example
function demonstrateQoS() {
  const client = mqtt.connect(options);

  client.on('connect', function () {
    console.log('Demonstrating QoS levels...');

    const messages = [
      { topic: 'qos/demo/0', payload: 'QoS 0 - At most once', qos: 0 },
      { topic: 'qos/demo/1', payload: 'QoS 1 - At least once', qos: 1 },
      { topic: 'qos/demo/2', payload: 'QoS 2 - Exactly once', qos: 2 }
    ];

    messages.forEach(({ topic, payload, qos }) => {
      client.publish(topic, payload, { qos }, (error) => {
        if (!error) {
          console.log(`Published with QoS ${qos}: ${payload}`);
        }
      });
    });
  });

  return client;
}

// 7. Secure MQTT (MQTTS) Example
function createSecureClient() {
  const secureOptions = {
    host: 'mqtt.example.com',
    port: 8883,
    protocol: 'mqtts',
    rejectUnauthorized: true,
    ca: fs.readFileSync('./ca.crt'),
    cert: fs.readFileSync('./client.crt'),
    key: fs.readFileSync('./client.key'),
    clientId: `secure_client_${Math.random().toString(16).substr(2, 8)}`
  };

  const client = mqtt.connect(secureOptions);

  client.on('connect', function () {
    console.log('Secure MQTT connection established');
  });

  return client;
}

// 8. Topic Patterns and Wildcards
function demonstrateTopicPatterns() {
  const client = mqtt.connect(options);

  client.on('connect', function () {
    console.log('Demonstrating topic patterns...');

    // Subscribe using different wildcard patterns
    const subscriptions = [
      { topic: 'sensors/+/temperature', qos: 1 },    // Single-level wildcard
      { topic: 'devices/room1/#', qos: 1 },          // Multi-level wildcard
      { topic: 'alerts/+/critical', qos: 1 },        // Mixed pattern
      { topic: 'system/status', qos: 1 }             // Exact match
    ];

    subscriptions.forEach(({ topic, qos }) => {
      client.subscribe(topic, { qos }, (error) => {
        if (!error) {
          console.log(`Subscribed to pattern: ${topic}`);
        }
      });
    });
  });

  return client;
}

// Run examples
if (require.main === module) {
  console.log('=== MQTT Examples ===\n');

  // Start publisher
  const publisher = createPublisher();

  // Start subscriber
  setTimeout(() => {
    const subscriber = createSubscriber();
  }, 1000);

  // Demonstrate other features
  setTimeout(() => {
    demonstrateRetainedMessages();
    demonstrateQoS();
  }, 2000);

  // Graceful shutdown
  process.on('SIGINT', () => {
    console.log('\nShutting down MQTT clients...');
    publisher.end();
    process.exit(0);
  });
}

module.exports = {
  createPublisher,
  createSubscriber,
  createClientWithLWT,
  demonstrateRetainedMessages,
  demonstrateQoS,
  createSecureClient,
  demonstrateTopicPatterns
};

💻 IoT-Sensor-Simulation mit MQTT javascript

🟡 intermediate ⭐⭐⭐

Vollständige IoT-Sensor-Simulation mit mehreren Gerätetypen und Echtzeit-Datenstreaming

⏱️ 25 min 🏷️ mqtt, iot, sensors, simulation, smart home
Prerequisites: JavaScript, MQTT basics, Event-driven programming
// IoT Sensor Simulation with MQTT
// Simulates multiple types of IoT sensors publishing data via MQTT

const mqtt = require('mqtt');
const { EventEmitter } = require('events');

// MQTT Configuration
const MQTT_BROKER = 'mqtt.eclipse.org';
const MQTT_PORT = 1883;

// Base class for IoT sensors
class IoTSensor extends EventEmitter {
  constructor(config) {
    super();
    this.id = config.id;
    this.name = config.name;
    this.location = config.location;
    this.interval = config.interval || 5000;
    this.client = null;
    this.isRunning = false;
    this.errorRate = config.errorRate || 0.05; // 5% error rate
  }

  connect() {
    const options = {
      host: MQTT_BROKER,
      port: MQTT_PORT,
      clientId: `sensor_${this.id}`,
      will: {
        topic: `devices/${this.id}/status`,
        payload: JSON.stringify({
          status: 'offline',
          timestamp: new Date().toISOString(),
          reason: 'unexpected_disconnect'
        }),
        qos: 1,
        retain: true
      }
    };

    this.client = mqtt.connect(options);

    this.client.on('connect', () => {
      console.log(`${this.constructor.name} ${this.name} connected`);
      this.publishStatus('online');
      this.emit('connected');
    });

    this.client.on('error', (error) => {
      console.error(`${this.name} connection error:`, error);
      this.emit('error', error);
    });

    this.client.on('offline', () => {
      console.log(`${this.name} went offline`);
      this.emit('offline');
    });

    this.client.on('reconnect', () => {
      console.log(`${this.name} reconnecting...`);
    });

    return this.client;
  }

  start() {
    if (!this.client) {
      this.connect();
    }

    this.client.on('connect', () => {
      if (!this.isRunning) {
        this.isRunning = true;
        this.startSimulation();
      }
    });
  }

  stop() {
    this.isRunning = false;
    if (this.client) {
      this.publishStatus('offline', 'graceful_shutdown');
      this.client.end();
    }
  }

  publishStatus(status, reason = '') {
    const statusMessage = {
      deviceId: this.id,
      deviceType: this.constructor.name,
      location: this.location,
      status: status,
      timestamp: new Date().toISOString(),
      reason: reason
    };

    this.client.publish(
      `devices/${this.id}/status`,
      JSON.stringify(statusMessage),
      { qos: 1, retain: true }
    );
  }

  publishData(topic, data) {
    const message = {
      deviceId: this.id,
      deviceType: this.constructor.name,
      location: this.location,
      timestamp: new Date().toISOString(),
      ...data
    };

    // Simulate occasional errors
    if (Math.random() < this.errorRate) {
      message.error = true;
      message.errorMessage = 'Sensor read error';
    }

    this.client.publish(topic, JSON.stringify(message), { qos: 1 });
    this.emit('data', message);
  }

  startSimulation() {
    // Override in subclasses
    console.log(`Starting simulation for ${this.name}`);
  }
}

// Temperature Sensor
class TemperatureSensor extends IoTSensor {
  constructor(config) {
    super(config);
    this.baseTemp = config.baseTemp || 22;
    this.variance = config.variance || 5;
  }

  startSimulation() {
    console.log(`🌡️  Starting temperature simulation for ${this.name}`);

    this.simulationInterval = setInterval(() => {
      if (!this.isRunning) return;

      const temperature = this.baseTemp + (Math.random() - 0.5) * this.variance * 2;
      const humidity = 40 + (Math.random() - 0.5) * 20;

      this.publishData(`sensors/temperature/${this.location}`, {
        temperature: parseFloat(temperature.toFixed(2)),
        unit: 'celsius',
        humidity: parseFloat(humidity.toFixed(2)),
        unitHumidity: 'percent'
      });
    }, this.interval);
  }
}

// Motion Sensor
class MotionSensor extends IoTSensor {
  constructor(config) {
    super(config);
    this.sensitivity = config.sensitivity || 0.3;
    this.lastMotion = null;
  }

  startSimulation() {
    console.log(`🚪 Starting motion detection for ${this.name}`);

    this.simulationInterval = setInterval(() => {
      if (!this.isRunning) return;

      const motionDetected = Math.random() < this.sensitivity;

      if (motionDetected) {
        this.lastMotion = new Date().toISOString();
        this.publishData(`sensors/motion/${this.location}`, {
          motion: true,
          detectionTime: this.lastMotion,
          confidence: Math.random()
        });
      }
    }, this.interval);
  }
}

// Light Sensor
class LightSensor extends IoTSensor {
  constructor(config) {
    super(config);
    this.baseLux = config.baseLux || 300;
  }

  startSimulation() {
    console.log(`💡 Starting light sensor for ${this.name}`);

    this.simulationInterval = setInterval(() => {
      if (!this.isRunning) return;

      const hour = new Date().getHours();
      let lux;

      // Simulate day/night cycle
      if (hour >= 6 && hour <= 18) {
        lux = this.baseLux + Math.random() * 500;
      } else {
        lux = Math.random() * 50;
      }

      this.publishData(`sensors/light/${this.location}`, {
        illuminance: parseFloat(lux.toFixed(2)),
        unit: 'lux',
        timeOfDay: hour >= 6 && hour <= 18 ? 'day' : 'night'
      });
    }, this.interval);
  }
}

// Door/Window Sensor
class DoorSensor extends IoTSensor {
  constructor(config) {
    super(config);
    this.isOpen = false;
    this.changeProbability = config.changeProbability || 0.1;
  }

  startSimulation() {
    console.log(`🚪 Starting door sensor for ${this.name}`);

    this.simulationInterval = setInterval(() => {
      if (!this.isRunning) return;

      const shouldChange = Math.random() < this.changeProbability;

      if (shouldChange) {
        this.isOpen = !this.isOpen;

        this.publishData(`sensors/door/${this.location}`, {
          state: this.isOpen ? 'open' : 'closed',
          lastChange: new Date().toISOString(),
          batteryLevel: 80 + Math.random() * 20
        });
      }
    }, this.interval);
  }
}

// Smart Camera
class SmartCamera extends IoTSensor {
  constructor(config) {
    super(config);
    this.detectionTypes = ['person', 'vehicle', 'animal', 'package'];
    this.detectionRate = config.detectionRate || 0.2;
  }

  startSimulation() {
    console.log(`📹 Starting smart camera for ${this.name}`);

    this.simulationInterval = setInterval(() => {
      if (!this.isRunning) return;

      if (Math.random() < this.detectionRate) {
        const detection = this.detectionTypes[Math.floor(Math.random() * this.detectionTypes.length)];
        const confidence = 0.7 + Math.random() * 0.3;

        this.publishData(`cameras/detection/${this.location}`, {
          detectionType: detection,
          confidence: parseFloat(confidence.toFixed(2)),
          snapshotUrl: `https://example.com/snapshots/${this.id}/${Date.now()}.jpg`,
          recording: Math.random() > 0.5
        });
      }
    }, this.interval);
  }
}

// Sensor Manager
class SensorManager {
  constructor() {
    this.sensors = [];
    this.alertThresholds = new Map();
  }

  addSensor(sensor) {
    this.sensors.push(sensor);

    // Set up event listeners
    sensor.on('data', (data) => {
      this.processSensorData(data);
    });

    sensor.on('error', (error) => {
      console.error(`Sensor ${sensor.id} error:`, error);
    });

    sensor.on('offline', () => {
      console.log(`Sensor ${sensor.id} went offline`);
    });

    return sensor;
  }

  processSensorData(data) {
    // Check for alerts
    if (data.temperature && data.temperature > 35) {
      this.sendAlert({
        type: 'temperature_high',
        deviceId: data.deviceId,
        location: data.location,
        value: data.temperature,
        threshold: 35,
        timestamp: data.timestamp
      });
    }

    if (data.motion && data.motion) {
      console.log(`🚨 Motion detected at ${data.location}`);
    }

    if (data.state === 'open' && data.deviceType === 'DoorSensor') {
      this.sendAlert({
        type: 'door_open',
        deviceId: data.deviceId,
        location: data.location,
        timestamp: data.timestamp
      });
    }
  }

  sendAlert(alert) {
    const alertMessage = {
      id: `alert_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`,
      ...alert,
      severity: alert.type.includes('temperature') ? 'warning' : 'info'
    };

    console.log(`🚨 ALERT: ${JSON.stringify(alertMessage, null, 2)}`);

    // Publish alert to MQTT
    const alertClient = mqtt.connect(`mqtt://${MQTT_BROKER}:${MQTT_PORT}`);
    alertClient.on('connect', () => {
      alertClient.publish(
        `alerts/${alert.location}/${alert.type}`,
        JSON.stringify(alertMessage),
        { qos: 1 }
      );
      alertClient.end();
    });
  }

  startAll() {
    console.log('\n🚀 Starting all sensors...');
    this.sensors.forEach(sensor => sensor.start());
  }

  stopAll() {
    console.log('\n🛑 Stopping all sensors...');
    this.sensors.forEach(sensor => sensor.stop());
  }

  getStatus() {
    return {
      totalSensors: this.sensors.length,
      onlineSensors: this.sensors.filter(s => s.isRunning).length,
      sensorTypes: [...new Set(this.sensors.map(s => s.constructor.name))]
    };
  }
}

// Create and configure sensors
function createSmartHome() {
  const manager = new SensorManager();

  // Living room sensors
  manager.addSensor(new TemperatureSensor({
    id: 'temp_001',
    name: 'Living Room Temp Sensor',
    location: 'living_room',
    interval: 5000,
    baseTemp: 22,
    variance: 3
  }));

  manager.addSensor(new MotionSensor({
    id: 'motion_001',
    name: 'Living Room Motion Sensor',
    location: 'living_room',
    interval: 2000,
    sensitivity: 0.3
  }));

  manager.addSensor(new LightSensor({
    id: 'light_001',
    name: 'Living Room Light Sensor',
    location: 'living_room',
    interval: 3000,
    baseLux: 400
  }));

  // Bedroom sensors
  manager.addSensor(new TemperatureSensor({
    id: 'temp_002',
    name: 'Bedroom Temp Sensor',
    location: 'bedroom',
    interval: 6000,
    baseTemp: 20,
    variance: 2
  }));

  manager.addSensor(new DoorSensor({
    id: 'door_001',
    name: 'Bedroom Door Sensor',
    location: 'bedroom',
    interval: 4000,
    changeProbability: 0.1
  }));

  // Front door
  manager.addSensor(new DoorSensor({
    id: 'door_002',
    name: 'Front Door Sensor',
    location: 'front_door',
    interval: 2000,
    changeProbability: 0.05
  }));

  // Outdoor camera
  manager.addSensor(new SmartCamera({
    id: 'cam_001',
    name: 'Front Door Camera',
    location: 'front_door',
    interval: 5000,
    detectionRate: 0.3
  }));

  return manager;
}

// Run the simulation
if (require.main === module) {
  console.log('🏠 IoT Smart Home Simulation\n');

  const smartHome = createSmartHome();

  // Start simulation
  smartHome.startAll();

  // Display status every 10 seconds
  setInterval(() => {
    const status = smartHome.getStatus();
    console.log(`\n📊 Status: ${status.onlineSensors}/${status.totalSensors} sensors online`);
    console.log(`   Types: ${status.sensorTypes.join(', ')}`);
  }, 10000);

  // Graceful shutdown
  process.on('SIGINT', () => {
    console.log('\n👋 Shutting down smart home simulation...');
    smartHome.stopAll();
    setTimeout(() => {
      process.exit(0);
    }, 2000);
  });
}

module.exports = {
  IoTSensor,
  TemperatureSensor,
  MotionSensor,
  LightSensor,
  DoorSensor,
  SmartCamera,
  SensorManager
};

💻 MQTT Python-Implementierung python

🟡 intermediate ⭐⭐⭐

Python MQTT-Client-Beispiele mit Paho-MQTT-Bibliothek für professionelle IoT-Anwendungen

⏱️ 30 min 🏷️ mqtt, python, iot, smart home, monitoring
Prerequisites: Python, MQTT basics, Threading
# MQTT Python Examples
# pip install paho-mqtt

import paho.mqtt.client as mqtt
import json
import time
import threading
import logging
from datetime import datetime
from typing import Dict, List, Optional, Callable
import random

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# MQTT Configuration
MQTT_BROKER = "mqtt.eclipse.org"
MQTT_PORT = 1883
MQTT_KEEPALIVE = 60

class MQTTClient:
    """Advanced MQTT client with reconnection and message handling"""

    def __init__(self, client_id: str = None, username: str = None, password: str = None):
        self.client_id = client_id or f"python_client_{random.randint(1000, 9999)}"
        self.username = username
        self.password = password
        self.client = mqtt.Client(client_id=self.client_id)
        self.connected = False
        self.message_callbacks = {}
        self.subscription_topics = set()
        self.last_will_topic = None
        self.last_will_message = None

        # Set up authentication
        if username and password:
            self.client.username_pw_set(username, password)

        # Set up event handlers
        self.client.on_connect = self._on_connect
        self.client.on_disconnect = self._on_disconnect
        self.client.on_message = self._on_message
        self.client.on_publish = self._on_publish
        self.client.on_subscribe = self._on_subscribe
        self.client.on_unsubscribe = self._on_unsubscribe

    def _on_connect(self, client, userdata, flags, rc):
        """Called when the client connects to the broker"""
        if rc == 0:
            self.connected = True
            logger.info(f"Connected to MQTT broker with client ID: {self.client_id}")

            # Resubscribe to all topics
            for topic in self.subscription_topics:
                self.subscribe(topic)
        else:
            logger.error(f"Failed to connect to MQTT broker. Return code: {rc}")

    def _on_disconnect(self, client, userdata, rc):
        """Called when the client disconnects"""
        self.connected = False
        if rc != 0:
            logger.warning(f"Unexpected disconnection from MQTT broker. Return code: {rc}")
        else:
            logger.info("Disconnected from MQTT broker")

    def _on_message(self, client, userdata, msg):
        """Called when a message is received"""
        try:
            topic = msg.topic
            payload = msg.payload.decode('utf-8')

            # Try to parse as JSON first
            try:
                data = json.loads(payload)
                logger.info(f"Received JSON message on {topic}: {data}")
            except json.JSONDecodeError:
                data = payload
                logger.info(f"Received message on {topic}: {payload}")

            # Call registered callback if exists
            if topic in self.message_callbacks:
                self.message_callbacks[topic](topic, data)
            else:
                # Check for wildcard callbacks
                for registered_topic in self.message_callbacks:
                    if self._topic_matches(registered_topic, topic):
                        self.message_callbacks[registered_topic](topic, data)
                        break

        except Exception as e:
            logger.error(f"Error processing message: {e}")

    def _on_publish(self, client, userdata, mid):
        """Called when a message is published"""
        logger.debug(f"Message published with mid: {mid}")

    def _on_subscribe(self, client, userdata, mid, granted_qos):
        """Called when the client subscribes to a topic"""
        logger.info(f"Subscribed with mid: {mid}, QoS: {granted_qos}")

    def _on_unsubscribe(self, client, userdata, mid):
        """Called when the client unsubscribes from a topic"""
        logger.info(f"Unsubscribed with mid: {mid}")

    def _topic_matches(self, pattern: str, topic: str) -> bool:
        """Check if a topic matches a pattern (supports wildcards)"""
        pattern_parts = pattern.split('/')
        topic_parts = topic.split('/')

        if len(pattern_parts) != len(topic_parts):
            if pattern_parts[-1] != '#':
                return False

        for i, (pattern_part, topic_part) in enumerate(zip(pattern_parts, topic_parts)):
            if pattern_part == '+':
                continue
            elif pattern_part == '#':
                return True
            elif pattern_part != topic_part:
                return False

        return True

    def set_last_will(self, topic: str, message: str, qos: int = 1, retain: bool = False):
        """Set the Last Will and Testament message"""
        self.last_will_topic = topic
        self.last_will_message = message
        self.client.will_set(topic, message.encode('utf-8'), qos, retain)

    def connect(self, broker: str = MQTT_BROKER, port: int = MQTT_PORT,
                keepalive: int = MQTT_KEEPALIVE) -> bool:
        """Connect to the MQTT broker"""
        try:
            self.client.connect(broker, port, keepalive)
            self.client.loop_start()

            # Wait for connection
            timeout = 10  # seconds
            start_time = time.time()
            while not self.connected and time.time() - start_time < timeout:
                time.sleep(0.1)

            return self.connected
        except Exception as e:
            logger.error(f"Failed to connect to MQTT broker: {e}")
            return False

    def disconnect(self):
        """Disconnect from the MQTT broker"""
        if self.connected:
            self.client.loop_stop()
            self.client.disconnect()

    def subscribe(self, topic: str, qos: int = 1, callback: Callable = None) -> bool:
        """Subscribe to a topic"""
        try:
            result, mid = self.client.subscribe(topic, qos)
            if result == mqtt.MQTT_ERR_SUCCESS:
                self.subscription_topics.add(topic)
                if callback:
                    self.message_callbacks[topic] = callback
                logger.info(f"Subscribed to topic: {topic}")
                return True
            else:
                logger.error(f"Failed to subscribe to topic: {topic}")
                return False
        except Exception as e:
            logger.error(f"Error subscribing to topic: {e}")
            return False

    def unsubscribe(self, topic: str) -> bool:
        """Unsubscribe from a topic"""
        try:
            result, mid = self.client.unsubscribe(topic)
            if result == mqtt.MQTT_ERR_SUCCESS:
                self.subscription_topics.discard(topic)
                if topic in self.message_callbacks:
                    del self.message_callbacks[topic]
                logger.info(f"Unsubscribed from topic: {topic}")
                return True
            else:
                logger.error(f"Failed to unsubscribe from topic: {topic}")
                return False
        except Exception as e:
            logger.error(f"Error unsubscribing from topic: {e}")
            return False

    def publish(self, topic: str, payload, qos: int = 1, retain: bool = False) -> bool:
        """Publish a message to a topic"""
        try:
            if isinstance(payload, dict) or isinstance(payload, list):
                payload = json.dumps(payload)

            result = self.client.publish(topic, payload.encode('utf-8'), qos, retain)
            if result.rc == mqtt.MQTT_ERR_SUCCESS:
                logger.info(f"Published to {topic}: {payload}")
                return True
            else:
                logger.error(f"Failed to publish to {topic}")
                return False
        except Exception as e:
            logger.error(f"Error publishing message: {e}")
            return False

    def add_callback(self, topic: str, callback: Callable):
        """Add a callback for a specific topic"""
        self.message_callbacks[topic] = callback

# Example 1: Temperature Monitor
class TemperatureMonitor:
    """Monitors temperature sensors and alerts on thresholds"""

    def __init__(self, client: MQTTClient):
        self.client = client
        self.temperature_readings = {}
        self.alert_threshold = 30.0  # Celsius
        self.low_threshold = 10.0

        # Subscribe to temperature topics
        self.client.subscribe("sensors/temperature/+", qos=1, callback=self.handle_temperature)

    def handle_temperature(self, topic: str, data: dict):
        """Handle incoming temperature data"""
        try:
            location = topic.split('/')[-1]
            temperature = float(data.get('temperature', 0))
            timestamp = data.get('timestamp', datetime.now().isoformat())

            # Store reading
            self.temperature_readings[location] = {
                'temperature': temperature,
                'timestamp': timestamp
            }

            # Check thresholds
            if temperature > self.alert_threshold:
                self.send_alert(location, temperature, "HIGH")
            elif temperature < self.low_threshold:
                self.send_alert(location, temperature, "LOW")
            else:
                logger.info(f"🌡️ {location}: {temperature}°C - Normal")

        except Exception as e:
            logger.error(f"Error handling temperature data: {e}")

    def send_alert(self, location: str, temperature: float, alert_type: str):
        """Send temperature alert"""
        alert = {
            'type': 'temperature_alert',
            'alert_type': alert_type,
            'location': location,
            'temperature': temperature,
            'threshold': self.alert_threshold if alert_type == "HIGH" else self.low_threshold,
            'timestamp': datetime.now().isoformat()
        }

        self.client.publish(f"alerts/{location}/temperature", alert)
        logger.warning(f"🚨 Temperature alert - {location}: {temperature}°C ({alert_type})")

# Example 2: Smart Home Controller
class SmartHomeController:
    """Controls smart home devices based on sensor data"""

    def __init__(self, client: MQTTClient):
        self.client = client
        self.devices = {
            'living_room_light': False,
            'bedroom_light': False,
            'thermostat': 22,
            'security_system': False
        }

        # Subscribe to sensor and control topics
        self.client.subscribe("sensors/motion/+", callback=self.handle_motion)
        self.client.subscribe("sensors/light/+", callback=self.handle_light)
        self.client.subscribe("control/+", callback=self.handle_control)
        self.client.subscribe("devices/+/status", callback=self.handle_device_status)

    def handle_motion(self, topic: str, data: dict):
        """Handle motion sensor data"""
        location = topic.split('/')[-1]

        if data.get('motion', False):
            logger.info(f"🚶 Motion detected in {location}")

            # Turn on lights when motion is detected
            if location == 'living_room':
                self.toggle_device('living_room_light', True)
            elif location == 'bedroom':
                self.toggle_device('bedroom_light', True)

            # Activate security if outside business hours
            hour = datetime.now().hour
            if hour < 6 or hour > 22:
                self.toggle_device('security_system', True)

    def handle_light(self, topic: str, data: dict):
        """Handle light sensor data"""
        location = topic.split('/')[-1]
        illuminance = float(data.get('illuminance', 0))

        # Automatic lighting based on ambient light
        if illuminance < 100 and location == 'living_room':
            self.toggle_device('living_room_light', True)
            logger.info(f"💡 Low light detected in {location}, turning on lights")
        elif illuminance > 300 and location == 'living_room':
            self.toggle_device('living_room_light', False)
            logger.info(f"☀️ Sufficient light in {location}, turning off lights")

    def handle_control(self, topic: str, data: dict):
        """Handle control commands"""
        try:
            command = data.get('command')
            device = data.get('device')
            value = data.get('value')

            if command == 'toggle':
                self.toggle_device(device, value)
            elif command == 'set_thermostat':
                self.set_thermostat(value)
            elif command == 'security':
                self.toggle_device('security_system', value)

            logger.info(f"🎮 Control command received: {command} for {device}")

        except Exception as e:
            logger.error(f"Error handling control command: {e}")

    def handle_device_status(self, topic: str, data: dict):
        """Handle device status updates"""
        device_id = topic.split('/')[1]
        status = data.get('status')

        if status == 'offline':
            logger.warning(f"⚠️ Device {device_id} is offline")
            # Could trigger alert or attempt reconnection here

    def toggle_device(self, device: str, state: bool):
        """Toggle a device on/off"""
        if device in self.devices:
            self.devices[device] = state

            # Publish device state change
            message = {
                'device': device,
                'state': state,
                'timestamp': datetime.now().isoformat()
            }

            self.client.publish(f"devices/{device}/state", message)
            logger.info(f"🔌 {device} turned {'on' if state else 'off'}")

    def set_thermostat(self, temperature: float):
        """Set thermostat temperature"""
        self.devices['thermostat'] = temperature

        message = {
            'device': 'thermostat',
            'temperature': temperature,
            'timestamp': datetime.now().isoformat()
        }

        self.client.publish("devices/thermostat/setpoint", message)
        logger.info(f"🌡️ Thermostat set to {temperature}°C")

# Example 3: Data Logger
class MQTTDataLogger:
    """Logs MQTT data to file or database"""

    def __init__(self, client: MQTTClient, log_file: str = "mqtt_data.log"):
        self.client = client
        self.log_file = log_file

        # Subscribe to all sensor data
        self.client.subscribe("sensors/+", callback=self.log_sensor_data)
        self.client.subscribe("devices/+/state", callback=self.log_device_state)

    def log_sensor_data(self, topic: str, data):
        """Log sensor data"""
        log_entry = {
            'timestamp': datetime.now().isoformat(),
            'topic': topic,
            'type': 'sensor_data',
            'data': data
        }

        self._write_log(log_entry)

    def log_device_state(self, topic: str, data):
        """Log device state changes"""
        log_entry = {
            'timestamp': datetime.now().isoformat(),
            'topic': topic,
            'type': 'device_state',
            'data': data
        }

        self._write_log(log_entry)

    def _write_log(self, log_entry: dict):
        """Write log entry to file"""
        try:
            with open(self.log_file, 'a') as f:
                f.write(json.dumps(log_entry) + '
')
        except Exception as e:
            logger.error(f"Error writing to log file: {e}")

# Main application
def main():
    logger.info("🚀 Starting MQTT Python Application")

    # Create MQTT client with Last Will
    client = MQTTClient(client_id="python_controller")
    client.set_last_will(
        "controllers/python_controller/status",
        json.dumps({
            'status': 'offline',
            'timestamp': datetime.now().isoformat(),
            'reason': 'unexpected_disconnect'
        }),
        qos=1,
        retain=True
    )

    # Connect to broker
    if not client.connect():
        logger.error("Failed to connect to MQTT broker")
        return

    # Publish online status
    client.publish(
        "controllers/python_controller/status",
        json.dumps({
            'status': 'online',
            'timestamp': datetime.now().isoformat()
        }),
        retain=True
    )

    # Initialize components
    temperature_monitor = TemperatureMonitor(client)
    smart_home_controller = SmartHomeController(client)
    data_logger = MQTTDataLogger(client)

    # Simulate some device updates
    def simulate_devices():
        while True:
            # Simulate temperature sensor
            temp_data = {
                'temperature': round(random.uniform(15, 35), 2),
                'timestamp': datetime.now().isoformat()
            }
            client.publish("sensors/temperature/living_room", temp_data)

            # Simulate motion sensor
            if random.random() < 0.3:
                motion_data = {
                    'motion': True,
                    'timestamp': datetime.now().isoformat()
                }
                client.publish("sensors/motion/living_room", motion_data)

            time.sleep(5)

    # Start simulation in background thread
    simulation_thread = threading.Thread(target=simulate_devices, daemon=True)
    simulation_thread.start()

    try:
        logger.info("✅ Application running. Press Ctrl+C to stop.")
        while True:
            time.sleep(1)

    except KeyboardInterrupt:
        logger.info("👋 Shutting down application...")

        # Publish offline status
        client.publish(
            "controllers/python_controller/status",
            json.dumps({
                'status': 'offline',
                'timestamp': datetime.now().isoformat(),
                'reason': 'graceful_shutdown'
            }),
            retain=True
        )

        client.disconnect()
        logger.info("🔌 Application stopped")

if __name__ == "__main__":
    main()