🎯 Рекомендуемые коллекции
Балансированные коллекции примеров кода из различных категорий, которые вы можете исследовать
Примеры Разработки Raspberry Pi
Комплексные примеры разработки Raspberry Pi включая управление GPIO, интерфейс датчиков, встраиваемую разработку на Python/C++, проекты IoT и управление кластерами
💻 Основы GPIO на Python python
Фундаментальное управление GPIO с использованием Python и библиотек RPi.GPIO и gpiozero для управления светодиодами, ввода кнопок и чтения датчиков
# Raspberry Pi GPIO Basics with Python
# Required: sudo apt-get install python3-rpi.gpio python3-gpiozero
import RPi.GPIO as GPIO
import time
from gpiozero import LED, Button, Buzzer, MotionSensor, LightSensor
import random
# GPIO Setup and Basic LED Control
def basic_led_control():
"""Basic LED control using RPi.GPIO"""
# Set GPIO mode
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)
# LED pin
LED_PIN = 18
# Setup LED pin as output
GPIO.setup(LED_PIN, GPIO.OUT)
print("Basic LED Control Demo")
print("LED will blink 5 times...")
try:
for i in range(5):
print(f"Blink {i+1}")
GPIO.output(LED_PIN, GPIO.HIGH) # LED ON
time.sleep(0.5)
GPIO.output(LED_PIN, GPIO.LOW) # LED OFF
time.sleep(0.5)
finally:
GPIO.cleanup()
# Using gpiozero Library (Simpler)
def gpiozero_led_demo():
"""LED control using gpiozero library"""
led = LED(18)
print("GPIOzero LED Demo")
print("LED patterns:")
# Simple blink
led.blink(on_time=0.5, off_time=0.5, n=3)
# Fade in/out
led.pulse(fade_in_time=1, fade_out_time=1, n=2)
# Blinks with custom pattern
for _ in range(3):
led.on()
time.sleep(0.2)
led.off()
time.sleep(0.8)
# Button Input Handling
def button_input_demo():
"""Button input handling with RPi.GPIO"""
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)
LED_PIN = 18
BUTTON_PIN = 24
# Setup pins
GPIO.setup(LED_PIN, GPIO.OUT)
GPIO.setup(BUTTON_PIN, GPIO.IN, pull_up_down=GPIO.PUD_UP)
print("Button Input Demo")
print("Press the button connected to GPIO 24")
print("Press Ctrl+C to exit")
led_state = False
try:
while True:
button_state = GPIO.input(BUTTON_PIN)
if button_state == GPIO.LOW: # Button pressed (active low)
if not led_state:
led_state = True
GPIO.output(LED_PIN, GPIO.HIGH)
print("Button pressed - LED ON")
else:
if led_state:
led_state = False
GPIO.output(LED_PIN, GPIO.LOW)
print("Button released - LED OFF")
time.sleep(0.1)
except KeyboardInterrupt:
print("\nExiting...")
finally:
GPIO.cleanup()
# Button Interrupts
def button_interrupt_demo():
"""Button handling with interrupts"""
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)
LED_PIN = 18
BUTTON_PIN = 24
GPIO.setup(LED_PIN, GPIO.OUT)
GPIO.setup(BUTTON_PIN, GPIO.IN, pull_up_down=GPIO.PUD_UP)
print("Button Interrupt Demo")
print("Press button to toggle LED")
led_state = False
def button_callback(channel):
nonlocal led_state
led_state = not led_state
GPIO.output(LED_PIN, GPIO.HIGH if led_state else GPIO.LOW)
print(f"Button pressed - LED {'ON' if led_state else 'OFF'}")
# Add event detection with interrupt
GPIO.add_event_detect(BUTTON_PIN, GPIO.FALLING,
callback=button_callback, bouncetime=200)
try:
print("Press Ctrl+C to exit")
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\nExiting...")
finally:
GPIO.cleanup()
# PWM (Pulse Width Modulation)
def pwm_led_dimming():
"""LED brightness control with PWM"""
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)
LED_PIN = 18
GPIO.setup(LED_PIN, GPIO.OUT)
# Create PWM instance with 1000 Hz frequency
pwm = GPIO.PWM(LED_PIN, 1000)
print("PWM LED Dimming Demo")
print("LED brightness will fade in and out")
try:
pwm.start(0) # Start with 0% duty cycle
# Fade in
for duty in range(0, 101, 5):
pwm.ChangeDutyCycle(duty)
print(f"Brightness: {duty}%")
time.sleep(0.1)
time.sleep(1)
# Fade out
for duty in range(100, -1, -5):
pwm.ChangeDutyCycle(duty)
print(f"Brightness: {duty}%")
time.sleep(0.1)
# Breathing effect
print("\nBreathing effect...")
for _ in range(3):
for duty in range(0, 101, 2):
pwm.ChangeDutyCycle(duty)
time.sleep(0.02)
for duty in range(100, -1, -2):
pwm.ChangeDutyCycle(duty)
time.sleep(0.02)
finally:
pwm.stop()
GPIO.cleanup()
# Multiple LED Control
def rgb_led_control():
"""RGB LED color control"""
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)
# RGB LED pins
RED_PIN = 18
GREEN_PIN = 19
BLUE_PIN = 20
# Setup pins
GPIO.setup(RED_PIN, GPIO.OUT)
GPIO.setup(GREEN_PIN, GPIO.OUT)
GPIO.setup(BLUE_PIN, GPIO.OUT)
# Create PWM instances
red_pwm = GPIO.PWM(RED_PIN, 1000)
green_pwm = GPIO.PWM(GREEN_PIN, 1000)
blue_pwm = GPIO.PWM(BLUE_PIN, 1000)
def set_color(r, g, b):
"""Set RGB color (0-255 values)"""
red_pwm.ChangeDutyCycle(r * 100 // 255)
green_pwm.ChangeDutyCycle(g * 100 // 255)
blue_pwm.ChangeDutyCycle(b * 100 // 255)
print("RGB LED Color Demo")
red_pwm.start(0)
green_pwm.start(0)
blue_pwm.start(0)
try:
colors = [
(255, 0, 0), # Red
(0, 255, 0), # Green
(0, 0, 255), # Blue
(255, 255, 0), # Yellow
(255, 0, 255), # Magenta
(0, 255, 255), # Cyan
(255, 255, 255), # White
]
for r, g, b in colors:
print(f"Color: RGB({r}, {g}, {b})")
set_color(r, g, b)
time.sleep(1)
# Smooth color transition
print("\nSmooth color transition...")
for i in range(360):
# Convert HSV to RGB
h = i / 60
r = int(255 * max(0, min(1, abs(h % 3 - 1.5) - 0.5)))
g = int(255 * max(0, min(1, abs((h + 2) % 3 - 1.5) - 0.5)))
b = int(255 * max(0, min(1, abs((h + 4) % 3 - 1.5) - 0.5)))
set_color(r, g, b)
time.sleep(0.05)
finally:
red_pwm.stop()
green_pwm.stop()
blue_pwm.stop()
GPIO.cleanup()
# Ultrasonic Sensor (HC-SR04)
def ultrasonic_sensor():
"""Distance measurement with HC-SR04 ultrasonic sensor"""
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)
TRIG_PIN = 23
ECHO_PIN = 24
GPIO.setup(TRIG_PIN, GPIO.OUT)
GPIO.setup(ECHO_PIN, GPIO.IN)
print("HC-SR04 Ultrasonic Sensor Demo")
print("Measuring distance...")
try:
while True:
# Send 10μs pulse to trigger
GPIO.output(TRIG_PIN, GPIO.LOW)
time.sleep(0.00001)
GPIO.output(TRIG_PIN, GPIO.HIGH)
time.sleep(0.00001)
GPIO.output(TRIG_PIN, GPIO.LOW)
# Measure echo duration
pulse_start = None
pulse_end = None
timeout = time.time()
while GPIO.input(ECHO_PIN) == GPIO.LOW:
pulse_start = time.time()
if time.time() - timeout > 0.1: # Timeout
break
if pulse_start is None:
continue
timeout = time.time()
while GPIO.input(ECHO_PIN) == GPIO.HIGH:
pulse_end = time.time()
if time.time() - timeout > 0.1: # Timeout
break
if pulse_end is None:
continue
pulse_duration = pulse_end - pulse_start
distance = pulse_duration * 17150 # Speed of sound = 34300 cm/s
distance = round(distance, 2)
print(f"Distance: {distance} cm")
if distance < 10:
print("Object detected! Very close!")
elif distance < 30:
print("Object nearby")
time.sleep(0.5)
except KeyboardInterrupt:
print("\nStopping...")
finally:
GPIO.cleanup()
# DHT11/DHT22 Temperature and Humidity Sensor
def dht_sensor():
"""DHT11/DHT22 temperature and humidity sensor reading"""
# You'll need to install the library:
# pip install Adafruit-DHT
try:
import Adafruit_DHT
except ImportError:
print("Please install Adafruit_DHT library:")
print("pip install Adafruit-DHT")
return
DHT_SENSOR = Adafruit_DHT.DHT22 # Change to DHT11 if using DHT11
DHT_PIN = 4
print("DHT22 Temperature & Humidity Sensor Demo")
print("Reading sensor data...")
try:
while True:
humidity, temperature = Adafruit_DHT.read_retry(DHT_SENSOR, DHT_PIN)
if humidity is not None and temperature is not None:
print(f"Temperature: {temperature:.1f}°C")
print(f"Humidity: {humidity:.1f}%")
# Weather conditions
if temperature > 30:
print("It's hot!")
elif temperature < 10:
print("It's cold!")
if humidity > 70:
print("High humidity - might rain soon!")
elif humidity < 30:
print("Low humidity - dry air")
print("-" * 20)
else:
print("Failed to retrieve data from sensor")
time.sleep(2)
except KeyboardInterrupt:
print("\nStopping...")
# Main function to run all demos
def main():
"""Run all GPIO demos"""
demos = [
("Basic LED Control", basic_led_control),
("GPIOzero LED Demo", gpiozero_led_demo),
("Button Input Demo", button_input_demo),
("Button Interrupt Demo", button_interrupt_demo),
("PWM LED Dimming", pwm_led_dimming),
("RGB LED Control", rgb_led_control),
("Ultrasonic Sensor", ultrasonic_sensor),
("DHT Sensor", dht_sensor),
]
print("Raspberry Pi GPIO Demo Suite")
print("=" * 40)
for i, (name, func) in enumerate(demos, 1):
print(f"{i}. {name}")
print(f"{len(demos) + 1}. Run all demos")
print("0. Exit")
while True:
try:
choice = input("\nEnter your choice: ")
choice = int(choice)
if choice == 0:
break
elif choice == len(demos) + 1:
print("\nRunning all demos...")
for name, func in demos:
print(f"\n=== {name} ===")
func()
print(f"\n{name} completed!")
time.sleep(1)
break
elif 1 <= choice <= len(demos):
print(f"\n=== {demos[choice - 1][0]} ===")
demos[choice - 1][1]()
print(f"\n{demos[choice - 1][0]} completed!")
else:
print("Invalid choice")
except ValueError:
print("Please enter a valid number")
except KeyboardInterrupt:
print("\nExiting...")
break
if __name__ == "__main__":
main()
💻 Продвинутое Управление GPIO на C++ cpp
Высокопроизводительное управление GPIO с использованием C++ и библиотеки pigpio для приложений реального времени, аппаратных прерываний и продвинутого интерфейса датчиков
// Advanced Raspberry Pi GPIO Control with C++
// Compile with: g++ -o gpio_control gpio_control.cpp -lpigpio -lrt -lpthread
// Install pigpio: sudo apt-get install pigpio
#include <iostream>
#include <pigpio.h>
#include <chrono>
#include <thread>
#include <vector>
#include <cmath>
#include <signal.h>
#include <unistd.h>
// Pin definitions
#define LED_PIN 18
#define BUTTON_PIN 24
#define SERVO_PIN 25
#define TRIG_PIN 23
#define ECHO_PIN 24
#define DHT_PIN 4
// Global flag for graceful shutdown
volatile bool running = true;
// Signal handler for Ctrl+C
void signalHandler(int signum) {
std::cout << "\nInterrupt signal (" << signum << ") received. Shutting down..." << std::endl;
running = false;
}
class GPIOController {
private:
bool initialized;
public:
GPIOController() : initialized(false) {}
bool initialize() {
if (gpioInitialise() < 0) {
std::cerr << "Failed to initialize GPIO" << std::endl;
return false;
}
initialized = true;
std::cout << "GPIO initialized successfully" << std::endl;
return true;
}
~GPIOController() {
if (initialized) {
gpioTerminate();
std::cout << "GPIO terminated" << std::endl;
}
}
};
class LEDController {
private:
int pin;
bool isOn;
public:
LEDController(int ledPin) : pin(ledPin), isOn(false) {
gpioSetMode(pin, PI_OUTPUT);
gpioWrite(pin, 0);
}
void turnOn() {
gpioWrite(pin, 1);
isOn = true;
}
void turnOff() {
gpioWrite(pin, 0);
isOn = false;
}
void toggle() {
isOn ? turnOff() : turnOn();
}
void blink(int times, int onDuration, int offDuration) {
for (int i = 0; i < times; ++i) {
turnOn();
std::this_thread::sleep_for(std::chrono::milliseconds(onDuration));
turnOff();
std::this_thread::sleep_for(std::chrono::milliseconds(offDuration));
}
}
void setPWM(unsigned int dutyCycle) {
gpioPWM(pin, dutyCycle);
}
bool getState() const { return isOn; }
int getPin() const { return pin; }
};
class ServoController {
private:
int pin;
int minPulseWidth;
int maxPulseWidth;
int currentPulseWidth;
public:
ServoController(int servoPin) : pin(servoPin), minPulseWidth(500), maxPulseWidth(2500), currentPulseWidth(1500) {
gpioSetMode(pin, PI_OUTPUT);
gpioSetPWMfrequency(pin, 50); // 50Hz for servos
gpioSetPWMrange(pin, 20000); // 20ms range (20000us)
}
void setPosition(int angle) {
// Convert angle (0-180) to pulse width
int pulseWidth = minPulseWidth + (angle * (maxPulseWidth - minPulseWidth)) / 180;
currentPulseWidth = pulseWidth;
gpioServo(pin, pulseWidth);
}
void setMicroseconds(int microseconds) {
if (microseconds >= minPulseWidth && microseconds <= maxPulseWidth) {
currentPulseWidth = microseconds;
gpioServo(pin, microseconds);
}
}
void sweep(int startAngle, int endAngle, int steps, int delayMs) {
int stepSize = (endAngle - startAngle) / steps;
for (int angle = startAngle; (stepSize > 0 ? angle <= endAngle : angle >= endAngle); angle += stepSize) {
setPosition(angle);
std::this_thread::sleep_for(std::chrono::milliseconds(delayMs));
}
}
int getCurrentPulseWidth() const { return currentPulseWidth; }
};
class ButtonHandler {
private:
int pin;
std::function<void()> callback;
bool lastState;
int debounceMs;
public:
ButtonHandler(int buttonPin, std::function<void()> cb, int debounce = 50)
: pin(buttonPin), callback(cb), lastState(false), debounceMs(debounce) {
gpioSetMode(pin, PI_INPUT);
gpioSetPullUpDown(pin, PI_PUD_UP);
}
void check() {
bool currentState = (gpioRead(pin) == 0); // Active low
if (currentState && !lastState) {
// Button pressed
std::this_thread::sleep_for(std::chrono::milliseconds(debounceMs));
if (gpioRead(pin) == 0) { // Still pressed
callback();
}
}
lastState = currentState;
}
};
class UltrasonicSensor {
private:
int triggerPin;
int echoPin;
public:
UltrasonicSensor(int trigPin, int echoPin) : triggerPin(trigPin), echoPin(echoPin) {
gpioSetMode(triggerPin, PI_OUTPUT);
gpioSetMode(echoPin, PI_INPUT);
gpioWrite(triggerPin, 0);
}
double getDistance() {
// Send trigger pulse
gpioWrite(triggerPin, 1);
std::this_thread::sleep_for(std::chrono::microseconds(10));
gpioWrite(triggerPin, 0);
// Wait for echo start
auto start = std::chrono::high_resolution_clock::now();
while (gpioRead(echoPin) == 0) {
auto now = std::chrono::high_resolution_clock::now();
auto elapsed = std::chrono::duration_cast<std::chrono::microseconds>(now - start);
if (elapsed.count() > 30000) { // 30ms timeout
return -1;
}
}
// Measure echo duration
auto echoStart = std::chrono::high_resolution_clock::now();
while (gpioRead(echoPin) == 1) {
auto now = std::chrono::high_resolution_clock::now();
auto elapsed = std::chrono::duration_cast<std::chrono::microseconds>(now - echoStart);
if (elapsed.count() > 30000) { // 30ms timeout
return -1;
}
}
auto echoEnd = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::microseconds>(echoEnd - echoStart);
// Calculate distance (speed of sound = 343 m/s)
double distance = (duration.count() * 0.0343) / 2; // in cm
return distance;
}
bool isObjectNear(double threshold = 30.0) {
double distance = getDistance();
return (distance > 0 && distance < threshold);
}
};
// DHT22 Reader (simplified implementation)
class DHT22Reader {
private:
int pin;
std::vector<int> bitTimings;
public:
DHT22Reader(int dhtPin) : pin(dhtPin) {
gpioSetMode(pin, PI_INPUT);
gpioSetPullUpDown(pin, PI_PUD_UP);
}
struct DHT22Data {
float temperature;
float humidity;
bool valid;
DHT22Data() : temperature(0), humidity(0), valid(false) {}
};
DHT22Data readData() {
DHT22Data data;
// Send start signal
gpioSetMode(pin, PI_OUTPUT);
gpioWrite(pin, 0);
std::this_thread::sleep_for(std::chrono::milliseconds(1));
gpioWrite(pin, 1);
std::this_thread::sleep_for(std::chrono::microseconds(30));
// Switch to input and read response
gpioSetMode(pin, PI_INPUT);
// Wait for DHT response
int timeout = 1000;
while (gpioRead(pin) == 1 && timeout-- > 0) {
std::this_thread::sleep_for(std::chrono::microseconds(1));
}
if (timeout <= 0) {
return data; // No response
}
// Wait for start signal
timeout = 1000;
while (gpioRead(pin) == 0 && timeout-- > 0) {
std::this_thread::sleep_for(std::chrono::microseconds(1));
}
timeout = 1000;
while (gpioRead(pin) == 1 && timeout-- > 0) {
std::this_thread::sleep_for(std::chrono::microseconds(1));
}
// Read 40 bits
std::vector<int> bits;
for (int i = 0; i < 40; ++i) {
// Wait for bit start
timeout = 1000;
while (gpioRead(pin) == 0 && timeout-- > 0) {
std::this_thread::sleep_for(std::chrono::microseconds(1));
}
// Measure high duration
int highCount = 0;
while (gpioRead(pin) == 1 && highCount < 100) {
highCount++;
std::this_thread::sleep_for(std::chrono::microseconds(1));
}
bits.push_back(highCount > 40 ? 1 : 0);
}
// Convert bits to bytes
if (bits.size() == 40) {
int humidity = 0, temp = 0;
int checksum = 0;
for (int i = 0; i < 16; ++i) {
humidity = (humidity << 1) | bits[i];
}
for (int i = 16; i < 32; ++i) {
temp = (temp << 1) | bits[i];
}
for (int i = 32; i < 40; ++i) {
checksum = (checksum << 1) | bits[i];
}
// Verify checksum
int calcChecksum = ((humidity >> 8) + (humidity & 0xFF) +
(temp >> 8) + (temp & 0xFF)) & 0xFF;
if (calcChecksum == checksum) {
data.humidity = humidity / 10.0;
// Handle negative temperature
if (temp & 0x8000) {
temp = -(temp & 0x7FFF);
}
data.temperature = temp / 10.0;
data.valid = true;
}
}
return data;
}
};
// Advanced LED Effects Class
class LEDEffects {
private:
LEDController redLed;
LEDController greenLed;
LEDController blueLed;
public:
LEDEffects() : redLed(17), greenLed(27), blueLed(22) {}
void colorCycle(int cycles, int delayMs) {
for (int cycle = 0; cycle < cycles; ++cycle) {
// Red to Green
for (int i = 0; i <= 100; ++i) {
redLed.setPWM(100 - i);
greenLed.setPWM(i);
blueLed.setPWM(0);
std::this_thread::sleep_for(std::chrono::milliseconds(delayMs));
}
// Green to Blue
for (int i = 0; i <= 100; ++i) {
redLed.setPWM(0);
greenLed.setPWM(100 - i);
blueLed.setPWM(i);
std::this_thread::sleep_for(std::chrono::milliseconds(delayMs));
}
// Blue to Red
for (int i = 0; i <= 100; ++i) {
redLed.setPWM(i);
greenLed.setPWM(0);
blueLed.setPWM(100 - i);
std::this_thread::sleep_for(std::chrono::milliseconds(delayMs));
}
}
}
void policeLight(int duration) {
auto endTime = std::chrono::steady_clock::now() + std::chrono::seconds(duration);
while (std::chrono::steady_clock::now() < endTime && running) {
redLed.setPWM(100);
std::this_thread::sleep_for(std::chrono::milliseconds(50));
redLed.setPWM(0);
std::this_thread::sleep_for(std::chrono::milliseconds(50));
blueLed.setPWM(100);
std::this_thread::sleep_for(std::chrono::milliseconds(50));
blueLed.setPWM(0);
std::this_thread::sleep_for(std::chrono::milliseconds(300));
}
}
void breathingEffect(int cycles, int speed) {
for (int cycle = 0; cycle < cycles; ++cycle) {
for (int i = 0; i <= 100; ++i) {
int brightness = static_cast<int>(100 * (1 + std::sin(i * 3.14159 / 50)) / 2);
redLed.setPWM(brightness);
greenLed.setPWM(brightness);
blueLed.setPWM(brightness);
std::this_thread::sleep_for(std::chrono::milliseconds(speed));
}
}
}
void turnOffAll() {
redLed.setPWM(0);
greenLed.setPWM(0);
blueLed.setPWM(0);
}
};
// Main application class
class PiGPIOApp {
private:
GPIOController gpio;
LEDController statusLed;
ServoController servo;
UltrasonicSensor ultrasonic;
DHT22Reader dht22;
LEDEffects effects;
ButtonHandler button;
// Button callback
void onButtonPress() {
std::cout << "Button pressed! Toggling LED..." << std::endl;
statusLed.toggle();
}
public:
PiGPIOApp()
: statusLed(LED_PIN), servo(SERVO_PIN), ultrasonic(TRIG_PIN, ECHO_PIN),
dht22(DHT_PIN), button(BUTTON_PIN, [this]() { this->onButtonPress(); }) {}
bool initialize() {
if (!gpio.initialize()) {
return false;
}
std::cout << "GPIO Application initialized successfully" << std::endl;
return true;
}
void runDemo() {
std::cout << "\n=== Raspberry Pi GPIO Advanced Demo ===" << std::endl;
while (running) {
std::cout << "\nAvailable demos:" << std::endl;
std::cout << "1. LED Blink Patterns" << std::endl;
std::cout << "2. Servo Control" << std::endl;
std::cout << "3. Ultrasonic Distance Measurement" << std::endl;
std::cout << "4. DHT22 Temperature/Humidity" << std::endl;
std::cout << "5. RGB LED Effects" << std::endl;
std::cout << "6. Button Interactive Demo" << std::endl;
std::cout << "7. Distance-based Servo Control" << std::endl;
std::cout << "0. Exit" << std::endl;
int choice;
std::cout << "Enter your choice: ";
std::cin >> choice;
switch (choice) {
case 1:
ledDemo();
break;
case 2:
servoDemo();
break;
case 3:
ultrasonicDemo();
break;
case 4:
dht22Demo();
break;
case 5:
rgbEffectsDemo();
break;
case 6:
buttonDemo();
break;
case 7:
distanceServoDemo();
break;
case 0:
running = false;
break;
default:
std::cout << "Invalid choice!" << std::endl;
}
}
}
private:
void ledDemo() {
std::cout << "\n--- LED Demo ---" << std::endl;
std::cout << "Blinking LED 5 times..." << std::endl;
statusLed.blink(5, 500, 500);
std::cout << "PWM fade effect..." << std::endl;
for (int i = 0; i <= 100; i += 10) {
statusLed.setPWM(i);
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
for (int i = 100; i >= 0; i -= 10) {
statusLed.setPWM(i);
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
statusLed.turnOff();
}
void servoDemo() {
std::cout << "\n--- Servo Demo ---" << std::endl;
std::cout << "Sweeping servo from 0 to 180 degrees..." << std::endl;
servo.sweep(0, 180, 18, 50);
std::this_thread::sleep_for(std::chrono::milliseconds(500));
std::cout << "Sweeping back from 180 to 0 degrees..." << std::endl;
servo.sweep(180, 0, 18, 50);
std::cout << "Setting servo to 90 degrees..." << std::endl;
servo.setPosition(90);
}
void ultrasonicDemo() {
std::cout << "\n--- Ultrasonic Sensor Demo ---" << std::endl;
std::cout << "Measuring distance for 10 seconds..." << std::endl;
std::cout << "Press Ctrl+C to stop" << std::endl;
auto endTime = std::chrono::steady_clock::now() + std::chrono::seconds(10);
while (std::chrono::steady_clock::now() < endTime && running) {
double distance = ultrasonic.getDistance();
if (distance > 0) {
std::cout << "Distance: " << distance << " cm" << std::endl;
if (distance < 10) {
statusLed.turnOn();
} else {
statusLed.turnOff();
}
} else {
std::cout << "Reading failed" << std::endl;
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
statusLed.turnOff();
}
void dht22Demo() {
std::cout << "\n--- DHT22 Sensor Demo ---" << std::endl;
std::cout << "Reading temperature and humidity..." << std::endl;
std::cout << "Press Ctrl+C to stop" << std::endl;
auto endTime = std::chrono::steady_clock::now() + std::chrono::seconds(30);
while (std::chrono::steady_clock::now() < endTime && running) {
auto data = dht22.readData();
if (data.valid) {
std::cout << "Temperature: " << data.temperature << "°C, ";
std::cout << "Humidity: " << data.humidity << "%" << std::endl;
} else {
std::cout << "Failed to read DHT22 data" << std::endl;
}
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
}
}
void rgbEffectsDemo() {
std::cout << "\n--- RGB LED Effects Demo ---" << std::endl;
std::cout << "Color cycling..." << std::endl;
effects.colorCycle(2, 10);
std::cout << "Police lights..." << std::endl;
effects.policeLight(3);
std::cout << "Breathing effect..." << std::endl;
effects.breathingEffect(3, 20);
effects.turnOffAll();
}
void buttonDemo() {
std::cout << "\n--- Button Demo ---" << std::endl;
std::cout << "Press the button to toggle the LED..." << std::endl;
std::cout << "Press Ctrl+C to stop" << std::endl;
auto endTime = std::chrono::steady_clock::now() + std::chrono::seconds(30);
while (std::chrono::steady_clock::now() < endTime && running) {
button.check();
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
statusLed.turnOff();
}
void distanceServoDemo() {
std::cout << "\n--- Distance-based Servo Control Demo ---" << std::endl;
std::cout << "Servo will move based on distance measurement" << std::endl;
std::cout << "Press Ctrl+C to stop" << std::endl;
auto endTime = std::chrono::steady_clock::now() + std::chrono::seconds(30);
while (std::chrono::steady_clock::now() < endTime && running) {
double distance = ultrasonic.getDistance();
if (distance > 0 && distance <= 50) {
// Map distance (0-50cm) to servo angle (0-180 degrees)
int angle = static_cast<int>((distance / 50.0) * 180.0);
servo.setPosition(angle);
std::cout << "Distance: " << distance << " cm -> Servo: " << angle << "°" << std::endl;
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
};
int main() {
// Set up signal handler
signal(SIGINT, signalHandler);
try {
PiGPIOApp app;
if (!app.initialize()) {
std::cerr << "Failed to initialize application" << std::endl;
return 1;
}
app.runDemo();
} catch (const std::exception& e) {
std::cerr << "Error: " << e.what() << std::endl;
return 1;
}
std::cout << "Application exited gracefully" << std::endl;
return 0;
}
💻 Проект IoT Метеорологической Станции python
Полная система метеорологической станции IoT с несколькими датчиками, ведением логов данных, веб-панелью и облачной интеграцией для мониторинга погоды в реальном времени
# Raspberry Pi IoT Weather Station
# Complete weather monitoring system with web dashboard and cloud integration
import time
import json
import sqlite3
import threading
from datetime import datetime, timedelta
from dataclasses import dataclass, asdict
from typing import Dict, List, Optional
import requests
import paho.mqtt.client as mqtt
import logging
from pathlib import Path
# GPIO and sensor libraries
try:
import RPi.GPIO as GPIO
import Adafruit_DHT
import smbus2
from w1thermsensor import W1ThermSensor
except ImportError as e:
print(f"Missing library: {e}")
print("Install with: pip install RPi.GPIO Adafruit-DHT smbus2 w1thermsensor paho-mqtt requests")
exit(1)
# Configuration
CONFIG = {
'sensors': {
'dht_pin': 4,
'bmp_i2c_address': 0x76, # BMP280 or BME280
'light_pin': 18,
'rain_pin': 23,
'wind_speed_pin': 24,
'wind_direction_pin': 25,
'one_wire_sensors': True,
},
'database': {
'path': 'weather_station.db',
'retention_days': 30,
},
'mqtt': {
'broker': 'localhost',
'port': 1883,
'username': '',
'password': '',
'topic': 'weather/station',
'enabled': True,
},
'web_server': {
'host': '0.0.0.0',
'port': 8080,
'enabled': True,
},
'cloud_apis': {
'weathercloud': {
'url': 'https://api.weathercloud.com/v1/submit',
'api_key': 'YOUR_API_KEY',
'enabled': False,
},
'thingspeak': {
'url': 'https://api.thingspeak.com/update',
'api_key': 'YOUR_THINGSPEAK_KEY',
'enabled': False,
}
},
'readings': {
'interval_seconds': 30,
'averaging_window': 5, # Take 5 readings and average
}
}
# Logging setup
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('weather_station.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger('WeatherStation')
@dataclass
class WeatherData:
timestamp: datetime
temperature: float
humidity: float
pressure: float
light_level: float
rain_detected: bool
wind_speed: float
wind_direction: float
uv_index: float = 0.0
battery_voltage: float = 0.0
class DatabaseManager:
def __init__(self, db_path: str):
self.db_path = db_path
self.init_database()
def init_database(self):
"""Initialize SQLite database"""
with sqlite3.connect(self.db_path) as conn:
conn.execute('''
CREATE TABLE IF NOT EXISTS weather_readings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
temperature REAL,
humidity REAL,
pressure REAL,
light_level REAL,
rain_detected BOOLEAN,
wind_speed REAL,
wind_direction REAL,
uv_index REAL,
battery_voltage REAL
)
''')
conn.execute('''
CREATE TABLE IF NOT EXISTS daily_summary (
date DATE PRIMARY KEY,
avg_temperature REAL,
max_temperature REAL,
min_temperature REAL,
avg_humidity REAL,
avg_pressure REAL,
total_rain_minutes INTEGER,
max_wind_speed REAL,
readings_count INTEGER
)
''')
conn.commit()
logger.info("Database initialized")
def insert_reading(self, data: WeatherData):
"""Insert weather reading into database"""
with sqlite3.connect(self.db_path) as conn:
conn.execute('''
INSERT INTO weather_readings
(timestamp, temperature, humidity, pressure, light_level,
rain_detected, wind_speed, wind_direction, uv_index, battery_voltage)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
data.timestamp.isoformat(),
data.temperature,
data.humidity,
data.pressure,
data.light_level,
data.rain_detected,
data.wind_speed,
data.wind_direction,
data.uv_index,
data.battery_voltage
))
conn.commit()
def get_recent_readings(self, hours: int = 24) -> List[Dict]:
"""Get recent weather readings"""
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.execute('''
SELECT * FROM weather_readings
WHERE timestamp > datetime('now', '-{} hours')
ORDER BY timestamp DESC
'''.format(hours))
return [dict(row) for row in cursor.fetchall()]
def get_daily_summary(self, date: str) -> Optional[Dict]:
"""Get daily weather summary"""
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.execute('''
SELECT * FROM daily_summary WHERE date = ?
''', (date,))
row = cursor.fetchone()
return dict(row) if row else None
def cleanup_old_data(self, retention_days: int):
"""Remove old data beyond retention period"""
with sqlite3.connect(self.db_path) as conn:
conn.execute('''
DELETE FROM weather_readings
WHERE timestamp < datetime('now', '-{} days')
'''.format(retention_days))
conn.commit()
logger.info(f"Cleaned up data older than {retention_days} days")
class MQTTPublisher:
def __init__(self, config: Dict):
self.config = config
self.client = mqtt.Client()
self.connected = False
if config.get('username') and config.get('password'):
self.client.username_pw_set(config['username'], config['password'])
self.client.on_connect = self.on_connect
self.client.on_disconnect = self.on_disconnect
def on_connect(self, client, userdata, flags, rc):
if rc == 0:
self.connected = True
logger.info("Connected to MQTT broker")
else:
logger.error(f"Failed to connect to MQTT broker: {rc}")
def on_disconnect(self, client, userdata, rc):
self.connected = False
logger.warning(f"Disconnected from MQTT broker: {rc}")
def connect(self):
"""Connect to MQTT broker"""
try:
self.client.connect(self.config['broker'], self.config['port'], 60)
self.client.loop_start()
return True
except Exception as e:
logger.error(f"MQTT connection failed: {e}")
return False
def publish_data(self, data: WeatherData):
"""Publish weather data to MQTT topic"""
if not self.connected:
return False
try:
payload = json.dumps({
'timestamp': data.timestamp.isoformat(),
'temperature': data.temperature,
'humidity': data.humidity,
'pressure': data.pressure,
'light': data.light_level,
'rain': data.rain_detected,
'wind_speed': data.wind_speed,
'wind_direction': data.wind_direction
})
result = self.client.publish(
self.config['topic'],
payload,
qos=1,
retain=False
)
return result.rc == mqtt.MQTT_ERR_SUCCESS
except Exception as e:
logger.error(f"Failed to publish MQTT data: {e}")
return False
class CloudPublisher:
def __init__(self, config: Dict):
self.config = config
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'RaspberryPi-WeatherStation/1.0'
})
def publish_to_weathercloud(self, data: WeatherData):
"""Publish data to WeatherCloud API"""
if not self.config['weathercloud']['enabled']:
return False
try:
payload = {
'api_key': self.config['weathercloud']['api_key'],
'timestamp': data.timestamp.isoformat(),
'temperature_c': data.temperature,
'humidity_percent': data.humidity,
'pressure_hpa': data.pressure,
'light_lux': data.light_level,
'wind_speed_ms': data.wind_speed,
'wind_direction_deg': data.wind_direction,
'rain_detected': data.rain_detected,
'uv_index': data.uv_index
}
response = self.session.post(
self.config['weathercloud']['url'],
json=payload,
timeout=10
)
if response.status_code == 200:
logger.info("Data published to WeatherCloud successfully")
return True
else:
logger.error(f"WeatherCloud API error: {response.status_code} - {response.text}")
return False
except Exception as e:
logger.error(f"WeatherCloud publishing failed: {e}")
return False
def publish_to_thingspeak(self, data: WeatherData):
"""Publish data to ThingSpeak"""
if not self.config['thingspeak']['enabled']:
return False
try:
payload = {
'api_key': self.config['thingspeak']['api_key'],
'field1': data.temperature,
'field2': data.humidity,
'field3': data.pressure,
'field4': data.light_level,
'field5': data.wind_speed,
'field6': data.wind_direction,
'field7': 1 if data.rain_detected else 0,
'field8': data.uv_index
}
response = self.session.post(
self.config['thingspeak']['url'],
data=payload,
timeout=10
)
if response.status_code == 200:
logger.info("Data published to ThingSpeak successfully")
return True
else:
logger.error(f"ThingSpeak API error: {response.status_code} - {response.text}")
return False
except Exception as e:
logger.error(f"ThingSpeak publishing failed: {e}")
return False
class SensorManager:
def __init__(self, config: Dict):
self.config = config['sensors']
self.setup_gpio()
self.setup_sensors()
def setup_gpio(self):
"""Setup GPIO pins"""
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)
# Setup rain sensor (digital input)
if 'rain_pin' in self.config:
GPIO.setup(self.config['rain_pin'], GPIO.IN, pull_up_down=GPIO.PUD_UP)
# Setup light sensor (analog via MCP3008 would be better, but using digital for demo)
if 'light_pin' in self.config:
GPIO.setup(self.config['light_pin'], GPIO.IN)
# Setup wind speed sensor
if 'wind_speed_pin' in self.config:
GPIO.setup(self.config['wind_speed_pin'], GPIO.IN, pull_up_down=GPIO.PUD_UP)
# Setup wind direction sensor
if 'wind_direction_pin' in self.config:
GPIO.setup(self.config['wind_direction_pin'], GPIO.IN)
def setup_sensors(self):
"""Initialize sensor objects"""
try:
# DHT22 sensor
if 'dht_pin' in self.config:
self.dht_sensor = Adafruit_DHT.DHT22
# BMP280/BME280 sensor (pressure)
if 'bmp_i2c_address' in self.config:
self.bmp_i2c = smbus2.SMBus(1)
self.bmp_address = self.config['bmp_i2c_address']
# DS18B20 temperature sensors
if self.config.get('one_wire_sensors', False):
self.temp_sensors = W1ThermSensor.get_available_sensors()
logger.info(f"Found {len(self.temp_sensors)} temperature sensors")
else:
self.temp_sensors = []
except Exception as e:
logger.error(f"Sensor setup failed: {e}")
self.dht_sensor = None
self.bmp_i2c = None
self.temp_sensors = []
def read_dht22(self) -> Optional[tuple]:
"""Read DHT22 sensor"""
if not self.dht_sensor:
return None
try:
humidity, temperature = Adafruit_DHT.read_retry(
self.dht_sensor,
self.config['dht_pin'],
retries=3,
delay_seconds=2
)
if humidity is not None and temperature is not None:
return (temperature, humidity)
except Exception as e:
logger.error(f"DHT22 read error: {e}")
return None
def read_bmp280(self) -> Optional[float]:
"""Read BMP280/BME280 pressure sensor"""
if not hasattr(self, 'bmp_i2c'):
return None
try:
# This is a simplified example - real implementation would need proper BMP280 library
# For demo, we'll simulate pressure readings
import random
pressure = 1013.25 + random.uniform(-10, 10)
return pressure
except Exception as e:
logger.error(f"BMP280 read error: {e}")
return None
def read_light_level(self) -> float:
"""Read light sensor level"""
if 'light_pin' not in self.config:
return 0.0
try:
# This would normally use an ADC for analog light sensor
# For demo, we'll simulate with time-based value
import random
hour = datetime.now().hour
base_light = max(0, 100 - abs(12 - hour) * 8) # Peak at noon
variation = random.uniform(-20, 20)
return max(0, min(100, base_light + variation))
except Exception as e:
logger.error(f"Light sensor read error: {e}")
return 0.0
def read_rain_sensor(self) -> bool:
"""Read rain detection sensor"""
if 'rain_pin' not in self.config:
return False
try:
return GPIO.input(self.config['rain_pin']) == GPIO.LOW
except Exception as e:
logger.error(f"Rain sensor read error: {e}")
return False
def read_wind_speed(self) -> float:
"""Read wind speed sensor"""
if 'wind_speed_pin' not in self.config:
return 0.0
try:
# This would normally count pulses over time
# For demo, we'll simulate wind speed
import random
return random.uniform(0, 20) # 0-20 m/s
except Exception as e:
logger.error(f"Wind speed sensor read error: {e}")
return 0.0
def read_wind_direction(self) -> float:
"""Read wind direction sensor"""
if 'wind_direction_pin' not in self.config:
return 0.0
try:
# This would normally use an ADC or multiple digital inputs
# For demo, we'll simulate direction
import random
return random.uniform(0, 360) # 0-360 degrees
except Exception as e:
logger.error(f"Wind direction sensor read error: {e}")
return 0.0
def read_temperature_sensors(self) -> List[float]:
"""Read all temperature sensors"""
temperatures = []
for sensor in self.temp_sensors:
try:
temp = sensor.get_temperature()
temperatures.append(temp)
except Exception as e:
logger.error(f"Temperature sensor read error: {e}")
return temperatures
def read_all_sensors(self) -> Optional[WeatherData]:
"""Read all sensors and return consolidated data"""
readings_count = 0
temp_sum = 0.0
humidity_sum = 0.0
pressure_sum = 0.0
light_sum = 0.0
wind_speed_sum = 0.0
wind_direction_sum = 0.0
rain_count = 0
# Take multiple readings for averaging
for _ in range(CONFIG['readings']['averaging_window']):
# DHT22 readings
dht_data = self.read_dht22()
if dht_data:
temp_sum += dht_data[0]
humidity_sum += dht_data[1]
readings_count += 1
# Pressure reading
pressure = self.read_bmp280()
if pressure:
pressure_sum += pressure
# Light level
light_sum += self.read_light_level()
# Wind readings
wind_speed_sum += self.read_wind_speed()
wind_direction_sum += self.read_wind_direction()
# Rain detection
if self.read_rain_sensor():
rain_count += 1
time.sleep(0.1) # Small delay between readings
if readings_count == 0:
return None
# Calculate averages
temperature = temp_sum / readings_count
humidity = humidity_sum / readings_count
pressure = pressure_sum / CONFIG['readings']['averaging_window'] if pressure_sum > 0 else 1013.25
light_level = light_sum / CONFIG['readings']['averaging_window']
wind_speed = wind_speed_sum / CONFIG['readings']['averaging_window']
wind_direction = wind_direction_sum / CONFIG['readings']['averaging_window']
rain_detected = rain_count > CONFIG['readings']['averaging_window'] // 2
# Add DS18B20 sensor readings if available
ds18b20_temps = self.read_temperature_sensors()
if ds18b20_temps:
# Average with DHT reading if available
temperature = (temperature + sum(ds18b20_temps) / len(ds18b20_temps)) / 2
return WeatherData(
timestamp=datetime.now(),
temperature=round(temperature, 2),
humidity=round(humidity, 2),
pressure=round(pressure, 2),
light_level=round(light_level, 2),
rain_detected=rain_detected,
wind_speed=round(wind_speed, 2),
wind_direction=round(wind_direction, 2),
uv_index=0.0, # Would need UV sensor
battery_voltage=0.0 # Would need battery monitoring
)
class WeatherStation:
def __init__(self):
self.sensor_manager = SensorManager(CONFIG)
self.database = DatabaseManager(CONFIG['database']['path'])
self.mqtt_publisher = MQTTPublisher(CONFIG['mqtt']) if CONFIG['mqtt']['enabled'] else None
self.cloud_publisher = CloudPublisher(CONFIG['cloud_apis'])
self.running = False
self.last_cleanup = datetime.now()
def start(self):
"""Start the weather station"""
logger.info("Starting Weather Station...")
# Connect to MQTT if enabled
if self.mqtt_publisher and not self.mqtt_publisher.connect():
logger.warning("MQTT connection failed, continuing without MQTT")
self.running = True
# Start main reading loop in thread
self.reading_thread = threading.Thread(target=self.reading_loop)
self.reading_thread.daemon = True
self.reading_thread.start()
# Start cleanup thread
self.cleanup_thread = threading.Thread(target=self.cleanup_loop)
self.cleanup_thread.daemon = True
self.cleanup_thread.start()
logger.info("Weather Station started successfully")
def reading_loop(self):
"""Main sensor reading loop"""
while self.running:
try:
# Read all sensors
weather_data = self.sensor_manager.read_all_sensors()
if weather_data:
# Store in database
self.database.insert_reading(weather_data)
# Publish to MQTT
if self.mqtt_publisher:
self.mqtt_publisher.publish_data(weather_data)
# Publish to cloud APIs
threading.Thread(
target=self.publish_to_cloud,
args=(weather_data,),
daemon=True
).start()
logger.info(f"Weather reading: {weather_data.temperature}°C, "
f"{weather_data.humidity}% RH, {weather_data.pressure} hPa")
# Wait for next reading
time.sleep(CONFIG['readings']['interval_seconds'])
except Exception as e:
logger.error(f"Error in reading loop: {e}")
time.sleep(5) # Wait before retrying
def cleanup_loop(self):
"""Periodic cleanup of old data"""
while self.running:
try:
# Cleanup old data daily
if datetime.now() - self.last_cleanup > timedelta(days=1):
self.database.cleanup_old_data(CONFIG['database']['retention_days'])
self.last_cleanup = datetime.now()
logger.info("Database cleanup completed")
# Sleep for 1 hour
time.sleep(3600)
except Exception as e:
logger.error(f"Error in cleanup loop: {e}")
time.sleep(3600)
def publish_to_cloud(self, data: WeatherData):
"""Publish data to cloud APIs"""
try:
# WeatherCloud
if CONFIG['cloud_apis']['weathercloud']['enabled']:
self.cloud_publisher.publish_to_weathercloud(data)
# ThingSpeak
if CONFIG['cloud_apis']['thingspeak']['enabled']:
self.cloud_publisher.publish_to_thingspeak(data)
except Exception as e:
logger.error(f"Cloud publishing error: {e}")
def stop(self):
"""Stop the weather station"""
logger.info("Stopping Weather Station...")
self.running = False
if self.mqtt_publisher:
self.mqtt_publisher.client.loop_stop()
self.mqtt_publisher.client.disconnect()
# Cleanup GPIO
GPIO.cleanup()
logger.info("Weather Station stopped")
class WebDashboard:
"""Simple web dashboard for weather station"""
def __init__(self, weather_station: WeatherStation):
self.weather_station = weather_station
self.database = weather_station.database
def generate_html_report(self) -> str:
"""Generate HTML report of current weather data"""
recent_data = self.database.get_recent_readings(hours=24)
if not recent_data:
return "<html><body><h1>No data available</h1></body></html>"
latest = recent_data[0]
html = f"""
<!DOCTYPE html>
<html>
<head>
<title>Weather Station Dashboard</title>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<style>
body {{ font-family: Arial, sans-serif; margin: 20px; }}
.header {{ text-align: center; color: #333; }}
.current {{ background: #f0f8ff; padding: 20px; border-radius: 10px; margin: 20px 0; }}
.current h2 {{ color: #2c3e50; }}
.metric {{ display: inline-block; margin: 10px 20px; text-align: center; }}
.metric-value {{ font-size: 2em; font-weight: bold; color: #3498db; }}
.metric-label {{ font-size: 0.9em; color: #7f8c8d; }}
.status {{ color: #27ae60; }}
.warning {{ color: #f39c12; }}
.danger {{ color: #e74c3c; }}
</style>
<script>
function refreshData() {{ location.reload(); }}
setInterval(refreshData, 60000); // Refresh every minute
</script>
</head>
<body>
<div class="header">
<h1>🌤️ Weather Station Dashboard</h1>
<p>Last updated: {latest['timestamp']}</p>
</div>
<div class="current">
<h2>Current Conditions</h2>
<div class="metric">
<div class="metric-value">{latest['temperature']}°C</div>
<div class="metric-label">Temperature</div>
</div>
<div class="metric">
<div class="metric-value">{latest['humidity']}%</div>
<div class="metric-label">Humidity</div>
</div>
<div class="metric">
<div class="metric-value">{latest['pressure']} hPa</div>
<div class="metric-label">Pressure</div>
</div>
<div class="metric">
<div class="metric-value">{latest['light_level']}</div>
<div class="metric-label">Light Level</div>
</div>
<div class="metric">
<div class="metric-value">{latest['wind_speed']} m/s</div>
<div class="metric-label">Wind Speed</div>
</div>
<div class="metric">
<div class="metric-value">{latest['wind_direction']}°</div>
<div class="metric-label">Wind Direction</div>
</div>
<div class="metric">
<div class="metric-value {'rain' if latest['rain_detected'] else 'no-rain'}">
{'🌧️ Yes' if latest['rain_detected'] else '☀️ No'}
</div>
<div class="metric-label">Rain Detected</div>
</div>
</div>
<div style="text-align: center; margin-top: 20px;">
<button onclick="refreshData()">Refresh Data</button>
</div>
</body>
</html>
"""
return html
def main():
"""Main application entry point"""
print("🌤️ Raspberry Pi IoT Weather Station")
print("=" * 50)
try:
# Initialize weather station
weather_station = WeatherStation()
# Start weather station
weather_station.start()
# Create web dashboard
dashboard = WebDashboard(weather_station)
# Generate HTML report
html_report = dashboard.generate_html_report()
# Save to file (could be served by a web server)
with open('weather_dashboard.html', 'w') as f:
f.write(html_report)
print("Weather station started successfully!")
print(f"Dashboard available at: weather_dashboard.html")
print(f"Database: {CONFIG['database']['path']}")
print(f"MQTT enabled: {CONFIG['mqtt']['enabled']}")
print("Press Ctrl+C to stop...")
# Keep running until interrupted
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\nShutting down...")
weather_station.stop()
print("Weather station stopped gracefully")
except Exception as e:
print(f"Error: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()
💻 Настройка Кластера Raspberry Pi bash
Полное руководство по созданию и управлению кластером Raspberry Pi для распределенных вычислений, балансировки нагрузки и приложений высокой доступности
#!/bin/bash
# Raspberry Pi Cluster Setup and Management Script
# Complete automation for setting up a multi-node Raspberry Pi cluster
set -e
# Configuration
CLUSTER_NAME="pi-cluster"
MASTER_NODE="pi-master"
WORKER_NODES=("pi-worker1" "pi-worker2" "pi-worker3")
NETWORK_SUBNET="192.168.1.0/24"
MASTER_IP="192.168.1.100"
START_WORKER_IP=101
SSH_KEY_PATH="$HOME/.ssh/pi_cluster_key"
KUBERNETES_VERSION="v1.28.0"
# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m' # No Color
# Logging function
log() {
echo -e "${GREEN}[$(date '+%Y-%m-%d %H:%M:%S')] $1${NC}"
}
warn() {
echo -e "${YELLOW}[$(date '+%Y-%m-%d %H:%M:%S')] WARNING: $1${NC}"
}
error() {
echo -e "${RED}[$(date '+%Y-%m-%d %H:%M:%S')] ERROR: $1${NC}"
exit 1
}
# Check dependencies
check_dependencies() {
log "Checking dependencies..."
command -v ssh >/dev/null 2>&1 || error "SSH not installed"
command -v scp >/dev/null 2>&1 || error "SCP not installed"
command -v ping >/dev/null 2>&1 || error "Ping not installed"
# Check if running on Raspberry Pi
if ! grep -q "Raspberry Pi" /proc/cpuinfo; then
warn "Not running on Raspberry Pi. Some features may not work."
fi
log "Dependencies check completed"
}
# Setup SSH keys for cluster communication
setup_ssh_keys() {
log "Setting up SSH keys for cluster communication..."
# Generate SSH key if doesn't exist
if [[ ! -f "${SSH_KEY_PATH}" ]]; then
log "Generating SSH key pair..."
ssh-keygen -t rsa -b 4096 -f "${SSH_KEY_PATH}" -N ""
log "SSH key generated at ${SSH_KEY_PATH}"
fi
# Set correct permissions
chmod 600 "${SSH_KEY_PATH}"
chmod 644 "${SSH_KEY_PATH}.pub"
# Copy to local authorized_keys
mkdir -p "$HOME/.ssh"
cat "${SSH_KEY_PATH}.pub" >> "$HOME/.ssh/authorized_keys"
chmod 600 "$HOME/.ssh/authorized_keys"
log "SSH keys setup completed"
}
# Configure network settings
configure_network() {
log "Configuring network settings..."
# Update /etc/hosts
sudo tee -a /etc/hosts > /dev/null << EOF
# Raspberry Pi Cluster
${MASTER_IP} ${MASTER_NODE}
EOF
local worker_ip="${START_WORKER_IP}"
for worker in "${WORKER_NODES[@]}"; do
sudo tee -a /etc/hosts > /dev/null << EOF
${NETWORK_SUBNET%.*}.${worker_ip} ${worker}
EOF
((worker_ip++))
done
log "Network configuration completed"
}
# Install cluster software dependencies
install_cluster_software() {
log "Installing cluster software dependencies..."
# Update system
sudo apt update
sudo apt upgrade -y
# Install essential packages
sudo apt install -y \
docker.io \
docker-compose \
kubectl \
kubeadm \
kubelet \
kubernetes-cni \
avahi-daemon \
nfs-common \
curl \
wget \
git \
htop \
iotop \
nethogs \
python3 \
python3-pip \
python3-venv
# Install Docker Compose (newer version)
sudo curl -L "https://github.com/docker/compose/releases/latest/download/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
sudo chmod +x /usr/local/bin/docker-compose
# Add user to docker group
sudo usermod -aG docker $USER
# Enable and start services
sudo systemctl enable docker
sudo systemctl start docker
# Enable container features for Kubernetes
if grep -q "cgroup" /boot/cmdline.txt; then
log "cgroup already configured"
else
log "Configuring cgroups for Kubernetes..."
sudo tee -a /boot/cmdline.txt > /dev/null << EOF
cgroup_enable=cpuset cgroup_memory=1 cgroup_enable=memory
EOF
fi
log "Cluster software installation completed"
}
# Setup Kubernetes master node
setup_kubernetes_master() {
log "Setting up Kubernetes master node..."
# Initialize Kubernetes cluster
sudo kubeadm init \
--pod-network-cidr=10.244.0.0/16 \
--apiserver-advertise-address=${MASTER_IP} \
--ignore-preflight-errors=all
# Setup kubectl for user
mkdir -p "$HOME/.kube"
sudo cp -i /etc/kubernetes/admin.conf "$HOME/.kube/config"
sudo chown $(id -u):$(id -g) "$HOME/.kube/config"
# Install Flannel CNI (network plugin)
kubectl apply -f https://raw.githubusercontent.com/coreos/flannel/master/Documentation/kube-flannel.yml
# Remove master taint to allow pods on master (for small clusters)
kubectl taint nodes --all node-role.kubernetes.io/master-
# Generate join command for worker nodes
JOIN_COMMAND=$(sudo kubeadm token create --print-join-command)
echo "${JOIN_COMMAND}" > /tmp/k8s-join-command.sh
chmod +x /tmp/k8s-join-command.sh
log "Kubernetes master setup completed"
log "Worker join command saved to /tmp/k8s-join-command.sh"
}
# Setup Docker Swarm (alternative to Kubernetes)
setup_docker_swarm() {
log "Setting up Docker Swarm..."
# Initialize Docker Swarm
sudo docker swarm init --advertise-addr ${MASTER_IP}
# Get join token for workers
JOIN_TOKEN=$(sudo docker swarm join-token worker | grep -v "This node")
# Save join commands
echo "${JOIN_TOKEN}" > /tmp/docker-swarm-join-worker.sh
chmod +x /tmp/docker-swarm-join-worker.sh
# Create overlay network for cluster communication
sudo docker network create --driver overlay --attachable cluster-network
log "Docker Swarm setup completed"
log "Worker join command saved to /tmp/docker-swarm-join-worker.sh"
}
# Deploy monitoring stack (Prometheus + Grafana)
deploy_monitoring() {
log "Deploying monitoring stack..."
# Create monitoring namespace
kubectl create namespace monitoring --dry-run=client -o yaml | kubectl apply -f -
# Deploy Prometheus
kubectl apply -f https://raw.githubusercontent.com/prometheus-operator/prometheus-operator/main/bundle.yaml
# Deploy Grafana
cat > /tmp/grafana-deployment.yaml << 'EOF'
apiVersion: apps/v1
kind: Deployment
metadata:
name: grafana
namespace: monitoring
spec:
replicas: 1
selector:
matchLabels:
app: grafana
template:
metadata:
labels:
app: grafana
spec:
containers:
- name: grafana
image: grafana/grafana:latest
ports:
- containerPort: 3000
env:
- name: GF_SECURITY_ADMIN_PASSWORD
value: "admin"
---
apiVersion: v1
kind: Service
metadata:
name: grafana-service
namespace: monitoring
spec:
selector:
app: grafana
ports:
- port: 3000
targetPort: 3000
type: NodePort
EOF
kubectl apply -f /tmp/grafana-deployment.yaml
log "Monitoring stack deployment completed"
}
# Create cluster management scripts
create_management_scripts() {
log "Creating cluster management scripts..."
# Cluster status script
cat > /usr/local/bin/cluster-status << 'EOF'
#!/bin/bash
echo "=== Raspberry Pi Cluster Status ==="
echo ""
echo "System Information:"
echo "Hostname: $(hostname)"
echo "OS: $(lsb_release -d | cut -f2)"
echo "Kernel: $(uname -r)"
echo "Uptime: $(uptime -p)"
echo ""
echo "Resource Usage:"
echo "CPU Load: $(uptime | awk -F'load average:' '{print $2}')"
echo "Memory Usage: $(free -h | awk 'NR==2{printf "%.1f%%", $3*100/$2}')"
echo "Disk Usage: $(df -h / | awk 'NR==2{print $5}')"
echo "Temperature: $(vcgencmd measure_temp | cut -d'=' -f2)"
echo ""
if command -v kubectl >/dev/null 2>&1; then
echo "Kubernetes Status:"
kubectl get nodes 2>/dev/null || echo "Kubernetes not running"
fi
if command -v docker >/dev/null 2>&1; then
echo "Docker Status:"
docker info 2>/dev/null | grep -E "Nodes|Images|Containers" || echo "Docker not running"
fi
EOF
# Cluster deployment script
cat > /usr/local/bin/deploy-to-cluster << 'EOF'
#!/bin/bash
if [[ $# -lt 1 ]]; then
echo "Usage: deploy-to-cluster <deployment-file.yaml>"
exit 1
fi
DEPLOYMENT_FILE="$1"
echo "Deploying to cluster..."
kubectl apply -f "$DEPLOYMENT_FILE"
echo "Checking deployment status..."
kubectl rollout status deployment/$(basename "$DEPLOYMENT_FILE" .yaml) 2>/dev/null || true
kubectl get pods -l app=$(basename "$DEPLOYMENT_FILE" .yaml)
EOF
# Scale cluster script
cat > /usr/local/bin/scale-cluster << 'EOF'
#!/bin/bash
if [[ $# -lt 2 ]]; then
echo "Usage: scale-cluster <deployment-name> <replicas>"
exit 1
fi
DEPLOYMENT="$1"
REPLICAS="$2"
echo "Scaling $DEPLOYMENT to $REPLICAS replicas..."
kubectl scale deployment "$DEPLOYMENT" --replicas="$REPLICAS"
kubectl rollout status deployment/"$DEPLOYMENT"
EOF
# Make scripts executable
sudo chmod +x /usr/local/bin/cluster-status
sudo chmod +x /usr/local/bin/deploy-to-cluster
sudo chmod +x /usr/local/bin/scale-cluster
log "Cluster management scripts created"
}
# Setup distributed file system (NFS)
setup_nfs_server() {
log "Setting up NFS server..."
# Install NFS server
sudo apt install -y nfs-kernel-server
# Create shared directory
sudo mkdir -p /shared/cluster
sudo chown nobody:nogroup /shared/cluster
sudo chmod 777 /shared/cluster
# Configure NFS exports
sudo tee -a /etc/exports > /dev/null << EOF
/shared/cluster ${NETWORK_SUBNET}(rw,sync,no_subtree_check)
EOF
# Export NFS directory
sudo exportfs -a
sudo systemctl restart nfs-kernel-server
sudo systemctl enable nfs-kernel-server
log "NFS server setup completed"
}
# Create sample distributed applications
create_sample_apps() {
log "Creating sample distributed applications..."
# Web server deployment
cat > /tmp/web-cluster.yaml << 'EOF'
apiVersion: apps/v1
kind: Deployment
metadata:
name: web-cluster
spec:
replicas: 3
selector:
matchLabels:
app: web-cluster
template:
metadata:
labels:
app: web-cluster
spec:
containers:
- name: web-server
image: nginx:alpine
ports:
- containerPort: 80
resources:
requests:
memory: "64Mi"
cpu: "50m"
limits:
memory: "128Mi"
cpu: "100m"
---
apiVersion: v1
kind: Service
metadata:
name: web-cluster-service
spec:
selector:
app: web-cluster
ports:
- port: 80
targetPort: 80
type: LoadBalancer
EOF
# Distributed computation job
cat > /tmp/computation-job.yaml << 'EOF'
apiVersion: batch/v1
kind: Job
metadata:
name: distributed-computation
spec:
parallelism: 4
completions: 4
template:
spec:
containers:
- name: compute
image: python:3-alpine
command: ["python3", "-c", "print('Computing on node:', __import__('socket').gethostname())"]
resources:
requests:
memory: "32Mi"
cpu: "10m"
restartPolicy: OnFailure
EOF
# Save sample applications
mv /tmp/web-cluster.yaml ./web-cluster.yaml
mv /tmp/computation-job.yaml ./computation-job.yaml
log "Sample applications created: web-cluster.yaml, computation-job.yaml"
}
# Performance testing
run_performance_tests() {
log "Running cluster performance tests..."
# CPU benchmark
cat > /tmp/cpu-benchmark.yaml << 'EOF'
apiVersion: batch/v1
kind: Job
metadata:
name: cpu-benchmark
spec:
template:
spec:
containers:
- name: cpu-test
image: alpine
command: ["/bin/sh", "-c"]
args:
- |
echo "Starting CPU benchmark..."
result=$(dd if=/dev/zero of=/dev/null bs=1M count=1024 2>&1 | grep -o '[0-9.]* MB/s')
echo "CPU Performance: $result"
echo "Node: $(hostname)"
restartPolicy: OnFailure
EOF
# Network benchmark
cat > /tmp/network-benchmark.yaml << 'EOF'
apiVersion: batch/v1
kind: Job
metadata:
name: network-benchmark
spec:
template:
spec:
containers:
- name: network-test
image: alpine
command: ["/bin/sh", "-c"]
args:
- |
apk add --no-cache iperf3
echo "Network benchmark results:"
ping -c 4 8.8.8.8
echo "---"
iperf3 -c iperf.he.net -t 10 || echo "Network test completed"
restartPolicy: OnFailure
EOF
# Deploy benchmarks
kubectl apply -f /tmp/cpu-benchmark.yaml
kubectl apply -f /tmp/network-benchmark.yaml
log "Performance tests deployed"
log "Check results with: kubectl logs job/cpu-benchmark && kubectl logs job/network-benchmark"
}
# Cluster maintenance functions
update_cluster() {
log "Updating cluster software..."
# Update system packages
sudo apt update
sudo apt upgrade -y
# Update Docker
sudo apt install --only-upgrade docker.io -y
# Update Kubernetes components
sudo apt-mark unhold kubelet kubeadm kubectl
sudo apt update
sudo apt install -y kubelet kubeadm kubectl
sudo apt-mark hold kubelet kubeadm kubectl
log "Cluster update completed"
}
backup_cluster() {
log "Backing up cluster configuration..."
BACKUP_DIR="./cluster-backup-$(date +%Y%m%d_%H%M%S)"
mkdir -p "$BACKUP_DIR"
# Backup Kubernetes configurations
if [[ -d "$HOME/.kube" ]]; then
cp -r "$HOME/.kube" "$BACKUP_DIR/"
fi
# Backup Docker configurations
if [[ -d "/etc/docker" ]]; then
sudo cp -r /etc/docker "$BACKUP_DIR/"
fi
# Export running deployments
kubectl get deployments -o yaml > "$BACKUP_DIR/deployments.yaml" 2>/dev/null || true
kubectl get services -o yaml > "$BACKUP_DIR/services.yaml" 2>/dev/null || true
kubectl get configmaps -o yaml > "$BACKUP_DIR/configmaps.yaml" 2>/dev/null || true
# Create backup info
cat > "$BACKUP_DIR/backup-info.txt" << EOF
Cluster Backup Information
=======================
Date: $(date)
Hostname: $(hostname)
Cluster Size: $(kubectl get nodes --no-headers | wc -l) nodes
Running Pods: $(kubectl get pods --all-namespaces --no-headers | wc -l)
System Info: $(uname -a)
EOF
log "Cluster backup completed: $BACKUP_DIR"
}
# Worker node setup functions
setup_worker_node() {
local worker_ip="$1"
local worker_name="$2"
log "Setting up worker node: $worker_name ($worker_ip)"
# Test SSH connection
if ! ssh -o ConnectTimeout=10 -i "${SSH_KEY_PATH}" "pi@${worker_ip}" "echo 'SSH connection successful'" 2>/dev/null; then
warn "Cannot connect to worker node $worker_name. Please ensure SSH is configured."
return 1
fi
# Copy setup script to worker
scp -i "${SSH_KEY_PATH}" -o StrictHostKeyChecking=no "$0" "pi@${worker_ip}:/tmp/worker-setup.sh"
# Execute setup on worker
ssh -i "${SSH_KEY_PATH}" "pi@${worker_ip}" << EOF
sudo bash /tmp/worker-setup.sh worker-only
EOF
log "Worker node $worker_name setup completed"
}
# Main setup function
setup_cluster() {
local setup_type="${1:-full}"
log "Starting Raspberry Pi cluster setup..."
log "Setup type: $setup_type"
case "$setup_type" in
"master-only")
check_dependencies
setup_ssh_keys
configure_network
install_cluster_software
setup_kubernetes_master
setup_docker_swarm
setup_nfs_server
create_management_scripts
create_sample_apps
;;
"worker-only")
install_cluster_software
log "Worker node setup completed. Run join command on master to add to cluster."
;;
"full"|"")
check_dependencies
setup_ssh_keys
configure_network
install_cluster_software
setup_kubernetes_master
setup_docker_swarm
setup_nfs_server
deploy_monitoring
create_management_scripts
create_sample_apps
# Setup worker nodes if IPs are provided
local worker_ip="${START_WORKER_IP}"
for worker in "${WORKER_NODES[@]}"; do
setup_worker_node "${NETWORK_SUBNET%.*}.${worker_ip}" "$worker"
((worker_ip++))
done
run_performance_tests
backup_cluster
;;
*)
error "Invalid setup type: $setup_type"
;;
esac
log "Cluster setup completed successfully!"
if [[ "$setup_type" != "worker-only" ]]; then
echo ""
echo "=== Cluster Management Commands ==="
echo "Check cluster status: cluster-status"
echo "Deploy application: deploy-to-cluster <deployment.yaml>"
echo "Scale application: scale-cluster <deployment> <replicas>"
echo "Access Kubernetes: kubectl get nodes"
echo "Access Docker Swarm: docker node ls"
echo "Grafana URL: http://${MASTER_IP}:3000 (admin/admin)"
echo ""
echo "Join Commands for Worker Nodes:"
if [[ -f /tmp/k8s-join-command.sh ]]; then
echo "Kubernetes:"
cat /tmp/k8s-join-command.sh
fi
if [[ -f /tmp/docker-swarm-join-worker.sh ]]; then
echo ""
echo "Docker Swarm:"
cat /tmp/docker-swarm-join-worker.sh
fi
fi
}
# Show usage
show_usage() {
echo "Usage: $0 [COMMAND] [OPTIONS]"
echo ""
echo "Commands:"
echo " setup [TYPE] - Setup cluster (full, master-only, worker-only)"
echo " update - Update cluster software"
echo " backup - Backup cluster configuration"
echo " status - Show cluster status"
echo " benchmark - Run performance benchmarks"
echo " help - Show this help"
echo ""
echo "Examples:"
echo " $0 setup full # Setup complete cluster"
echo " $0 setup master-only # Setup master node only"
echo " $0 update # Update all cluster nodes"
echo " $0 backup # Create cluster backup"
}
# Main execution
main() {
local command="${1:-setup}"
case "$command" in
"setup")
setup_cluster "${2:-full}"
;;
"update")
update_cluster
;;
"backup")
backup_cluster
;;
"status")
cluster-status
;;
"benchmark")
run_performance_tests
;;
"help"|"--help"|"-h")
show_usage
;;
*)
error "Unknown command: $command"
show_usage
exit 1
;;
esac
}
# Trap for cleanup
trap cleanup EXIT
# Cleanup function
cleanup() {
log "Cleaning up..."
# Add any necessary cleanup here
}
# Execute main function
main "$@""