Exemples InfluxDB

Exemples de base de données de séries temporelles InfluxDB incluant la modélisation de données, les requêtes et le monitoring

💻 Opérations de Base InfluxDB python

🟢 simple ⭐⭐

Opérations fondamentales avec InfluxDB incluant l'écriture de données, la lecture et les requêtes de base

⏱️ 20 min 🏷️ influxdb, time series, iot, monitoring, python
Prerequisites: Python programming, Basic understanding of time series data
# InfluxDB Basic Operations
# Python - Using influxdb-client library

from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS
import datetime
import time
import random

# Configuration
url = "http://localhost:8086"
token = "your-token-here"
org = "your-org"
bucket = "example-bucket"

# Initialize client
class InfluxDBManager:
    def __init__(self, url, token, org):
        self.url = url
        self.token = token
        self.org = org
        self.client = InfluxDBClient(url=url, token=token, org=org)
        self.write_api = self.client.write_api(write_options=SYNCHRONOUS)
        self.query_api = self.client.query_api()

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.client.close()

    # 1. Writing Data Points
    def write_single_point(self, measurement, tags, fields, timestamp=None):
        """Write a single data point"""
        point = Point(measurement)

        # Add tags
        for key, value in tags.items():
            point.tag(key, value)

        # Add fields
        for key, value in fields.items():
            point.field(key, value)

        # Set timestamp
        if timestamp:
            point.time(timestamp, WritePrecision.NS)

        try:
            self.write_api.write(bucket=bucket, record=point)
            print(f"✅ Written point to {measurement}")
        except Exception as e:
            print(f"❌ Error writing point: {e}")

    def write_multiple_points(self, points):
        """Write multiple data points at once"""
        try:
            self.write_api.write(bucket=bucket, record=points)
            print(f"✅ Written {len(points)} points")
        except Exception as e:
            print(f"❌ Error writing points: {e}")

    # 2. Data Modeling Examples
    def write_sensor_data(self):
        """Write IoT sensor data"""
        # Temperature sensor readings
        for i in range(10):
            temperature = 20 + random.uniform(-5, 15)
            humidity = 60 + random.uniform(-20, 30)
            timestamp = datetime.datetime.now() - datetime.timedelta(minutes=i*5)

            point = Point("environmental")                 .tag("location", "office")                 .tag("sensor_id", "temp_001")                 .tag("room", "conference")                 .field("temperature", temperature)                 .field("humidity", humidity)                 .time(timestamp, WritePrecision.NS)

            self.write_api.write(bucket=bucket, record=point)

        # CPU and memory metrics
        for i in range(20):
            cpu_usage = random.uniform(20, 90)
            memory_usage = random.uniform(30, 85)
            timestamp = datetime.datetime.now() - datetime.timedelta(minutes=i)

            point = Point("system_metrics")                 .tag("host", "web-server-01")                 .tag("region", "us-west-1")                 .tag("environment", "production")                 .field("cpu_usage", cpu_usage)                 .field("memory_usage", memory_usage)                 .field("disk_usage", random.uniform(40, 70))                 .time(timestamp, WritePrecision.NS)

            self.write_api.write(bucket=bucket, record=point)

    def write_application_metrics(self):
        """Write application performance metrics"""
        for i in range(15):
            response_time = random.uniform(100, 2000)
            error_count = random.choices([0, 1, 2, 3], weights=[70, 20, 8, 2])[0]
            active_users = random.randint(100, 500)

            timestamp = datetime.datetime.now() - datetime.timedelta(minutes=i*2)

            point = Point("application_metrics")                 .tag("service", "user-service")                 .tag("endpoint", "/api/users")                 .tag("version", "v1.2.3")                 .field("response_time", response_time)                 .field("error_count", error_count)                 .field("active_users", active_users)                 .field("throughput", random.uniform(50, 200))                 .time(timestamp, WritePrecision.NS)

            self.write_api.write(bucket=bucket, record=point)

    # 3. Querying Data
    def query_recent_data(self, measurement, duration="1h"):
        """Query recent data from a measurement"""
        query = f'''
        from(bucket: "{bucket}")
            |> range(start: -{duration})
            |> filter(fn: (r) => r._measurement == "{measurement}")
            |> limit(n: 10)
        '''

        try:
            result = self.query_api.query(query, org=self.org)
            records = []

            for table in result:
                for record in table.records:
                    records.append({
                        'time': record.get_time(),
                        'field': record.get_field(),
                        'value': record.get_value(),
                        'measurement': record.get_measurement(),
                        'tags': record.values
                    })

            print(f"📊 Found {len(records)} records from {measurement}")
            return records
        except Exception as e:
            print(f"❌ Error querying data: {e}")
            return []

    def query_by_tags(self, measurement, tag_filters, duration="24h"):
        """Query data with specific tag filters"""
        tag_conditions = []
        for key, value in tag_filters.items():
            tag_conditions.append(f'r.{key} == "{value}"')

        tag_filter = ' and '.join(tag_conditions)

        query = f'''
        from(bucket: "{bucket}")
            |> range(start: -{duration})
            |> filter(fn: (r) => r._measurement == "{measurement}")
            |> filter(fn: (r) => {tag_filter})
            |> sort(columns: ["_time"])
        '''

        try:
            result = self.query_api.query(query, org=self.org)
            return self._process_query_result(result)
        except Exception as e:
            print(f"❌ Error querying by tags: {e}")
            return []

    def query_aggregated_data(self, measurement, field, aggregation="mean", window="5m", duration="1h"):
        """Query aggregated data"""
        query = f'''
        from(bucket: "{bucket}")
            |> range(start: -{duration})
            |> filter(fn: (r) => r._measurement == "{measurement}")
            |> filter(fn: (r) => r._field == "{field}")
            |> aggregateWindow(every: {window}, fn: {aggregation}, createEmpty: false)
            |> yield(name: "{aggregation}")
        '''

        try:
            result = self.query_api.query(query, org=self.org)
            return self._process_query_result(result)
        except Exception as e:
            print(f"❌ Error querying aggregated data: {e}")
            return []

    def _process_query_result(self, result):
        """Process query result into list of dictionaries"""
        records = []
        for table in result:
            for record in table.records:
                records.append({
                    'time': record.get_time(),
                    'field': record.get_field(),
                    'value': record.get_value(),
                    'measurement': record.get_measurement(),
                    'tags': {k: v for k, v in record.values.items() if not k.startswith('_')}
                })
        return records

    # 4. Advanced Queries
    def query_downsampled_data(self, measurement, duration="24h", window="1h"):
        """Query downsampled data with multiple aggregations"""
        query = f'''
        from(bucket: "{bucket}")
            |> range(start: -{duration})
            |> filter(fn: (r) => r._measurement == "{measurement}")
            |> aggregateWindow(
                every: {window},
                fn: (tables=<-, column="_value") =>
                    tables
                        |> mean(column: column)
                        |> set(key: "_field", value: "mean")
                        |> yield(name: "mean"),
                createEmpty: false
            )
        '''

        try:
            result = self.query_api.query(query, org=self.org)
            return self._process_query_result(result)
        except Exception as e:
            print(f"❌ Error querying downsampled data: {e}")
            return []

    def query_with_transformations(self):
        """Query with data transformations"""
        query = '''
        from(bucket: "example-bucket")
            |> range(start: -1h)
            |> filter(fn: (r) => r._measurement == "system_metrics")
            |> filter(fn: (r) => r._field == "cpu_usage")
            |> map(fn: (r) => ({ r with _value: r._value / 100.0 }))
            |> filter(fn: (r) => r._value > 0.8)
            |> sort(columns: ["_value"], desc: true)
            |> limit(n: 5)
        '''

        try:
            result = self.query_api.query(query, org=self.org)
            return self._process_query_result(result)
        except Exception as e:
            print(f"❌ Error with transformed query: {e}")
            return []

    # 5. Batch Operations
    def batch_write_sensor_data(self, num_points=100):
        """Write multiple sensor data points in batch"""
        points = []
        locations = ["office", "warehouse", "factory", "datacenter"]

        for i in range(num_points):
            location = random.choice(locations)
            temperature = 20 + random.uniform(-10, 20)
            humidity = 50 + random.uniform(-30, 40)
            pressure = 1000 + random.uniform(-50, 50)

            point = Point("environmental_sensors")                 .tag("location", location)                 .tag("sensor_type", "weather_station")                 .field("temperature", temperature)                 .field("humidity", humidity)                 .field("pressure", pressure)                 .time(datetime.datetime.now() - datetime.timedelta(seconds=i*30), WritePrecision.NS)

            points.append(point)

        self.write_multiple_points(points)
        return points

# Usage Examples
def demonstrate_basic_operations():
    """Demonstrate basic InfluxDB operations"""

    with InfluxDBManager(url, token, org) as db:
        print("🚀 Starting InfluxDB Basic Operations Demo")

        # 1. Write single data points
        print("
📝 Writing single data points...")
        db.write_single_point(
            measurement="test_data",
            tags={"source": "demo", "version": "1.0"},
            fields={"value": 42.5, "status": "ok"}
        )

        # 2. Write sensor data
        print("
🌡️ Writing sensor data...")
        db.write_sensor_data()

        # 3. Write application metrics
        print("
📊 Writing application metrics...")
        db.write_application_metrics()

        # 4. Query recent data
        print("
🔍 Querying recent data...")
        recent_data = db.query_recent_data("environmental", "30m")
        for record in recent_data[:5]:
            print(f"  {record['time']}: {record['field']} = {record['value']}")

        # 5. Query with tag filters
        print("
🏷️ Querying with tag filters...")
        filtered_data = db.query_by_tags(
            measurement="system_metrics",
            tag_filters={"host": "web-server-01", "environment": "production"},
            duration="1h"
        )
        print(f"Found {len(filtered_data)} records with specified tags")

        # 6. Query aggregated data
        print("
📈 Querying aggregated data...")
        avg_cpu = db.query_aggregated_data(
            measurement="system_metrics",
            field="cpu_usage",
            aggregation="mean",
            window="10m",
            duration="1h"
        )
        print(f"Average CPU usage points: {len(avg_cpu)}")

        # 7. Batch write
        print("
💾 Batch writing sensor data...")
        batch_points = db.batch_write_sensor_data(50)
        print(f"Batch written {len(batch_points)} points")

# 6. Data Exploration
class InfluxDBExplorer:
    def __init__(self, url, token, org):
        self.client = InfluxDBClient(url=url, token=token, org=org)
        self.query_api = self.client.query_api()

    def list_measurements(self, bucket="example-bucket"):
        """List all measurements in a bucket"""
        query = f'''
        import "influxdata/influxdb/schema"
        schema.measurements(bucket: "{bucket}")
        '''

        try:
            result = self.query_api.query(query, org=self.org)
            measurements = [record.get_value() for table in result for record in table.records]
            print(f"📋 Measurements in bucket '{bucket}': {measurements}")
            return measurements
        except Exception as e:
            print(f"❌ Error listing measurements: {e}")
            return []

    def show_tag_keys(self, measurement, bucket="example-bucket"):
        """Show all tag keys for a measurement"""
        query = f'''
        import "influxdata/influxdb/schema"
        schema.tagKeys(bucket: "{bucket}", predicate: (r) => r._measurement == "{measurement}")
        '''

        try:
            result = self.query_api.query(query, org=self.org)
            tag_keys = [record.get_value() for table in result for record in table.records]
            print(f"🏷️ Tag keys for '{measurement}': {tag_keys}")
            return tag_keys
        except Exception as e:
            print(f"❌ Error showing tag keys: {e}")
            return []

    def show_field_keys(self, measurement, bucket="example-bucket"):
        """Show all field keys for a measurement"""
        query = f'''
        import "influxdata/influxdb/schema"
        schema.fieldKeys(bucket: "{bucket}", predicate: (r) => r._measurement == "{measurement}")
        '''

        try:
            result = self.query_api.query(query, org=self.org)
            field_keys = [record.get_value() for table in result for record in table.records]
            print(f"🔢 Field keys for '{measurement}': {field_keys}")
            return field_keys
        except Exception as e:
            print(f"❌ Error showing field keys: {e}")
            return []

    def get_bucket_stats(self, bucket="example-bucket"):
        """Get basic statistics for a bucket"""
        query = f'''
        from(bucket: "{bucket}")
            |> range(start: -24h)
            |> group(columns: ["_measurement"])
            |> count()
        '''

        try:
            result = self.query_api.query(query, org=self.org)
            stats = {}
            for table in result:
                measurement = table.records[0].values.get("_measurement")
                count = table.records[0].get_value()
                stats[measurement] = count

            print(f"📊 Bucket statistics for '{bucket}':")
            for measurement, count in stats.items():
                print(f"  {measurement}: {count} points")

            return stats
        except Exception as e:
            print(f"❌ Error getting bucket stats: {e}")
            return {}

def demonstrate_exploration():
    """Demonstrate data exploration capabilities"""
    with InfluxDBExplorer(url, token, org) as explorer:
        print("
🔍 Exploring InfluxDB Data Structure")

        measurements = explorer.list_measurements()
        if measurements:
            for measurement in measurements[:3]:  # Show first 3
                explorer.show_tag_keys(measurement)
                explorer.show_field_keys(measurement)

        explorer.get_bucket_stats()

if __name__ == "__main__":
    # Run the demonstrations
    demonstrate_basic_operations()
    demonstrate_exploration()

💻 Monitoring et Alertes InfluxDB yaml

🟢 simple ⭐⭐⭐⭐

Configuration des tableaux de bord, règles d'alerte et systèmes de notification

⏱️ 35 min 🏷️ influxdb, monitoring, alerting, devops, telegraf
Prerequisites: Advanced InfluxDB operations, System administration, Monitoring concepts
# InfluxDB Monitoring and Alerting
# Configuration files and examples for setting up monitoring systems

# 1. Telegraf Configuration for System Monitoring
# /etc/telegraf/telegraf.conf

[agent]
  interval = "10s"
  round_interval = true
  metric_batch_size = 1000
  metric_buffer_limit = 10000
  collection_jitter = "0s"
  flush_interval = "10s"
  flush_jitter = "0s"

# Output to InfluxDB
[[outputs.influxdb_v2]]
  urls = ["http://localhost:8086"]
  token = "your-auth-token"
  organization = "your-org"
  bucket = "monitoring"

# System input plugins
[[inputs.cpu]]
  percpu = true
  totalcpu = true
  collect_cpu_time = false
  report_active = false
  core_tags = true

[[inputs.mem]]
  fieldpass = ["available", "available_percent", "buffered", "cached", "free", "total", "used", "used_percent"]

[[inputs.disk]]
  mount_points = ["/"]
  ignore_fs = ["tmpfs", "devtmpfs", "devfs", "overlay", "aufs", "squashfs"]

[[inputs.diskio]]

[[inputs.kernel]]
  # No configuration needed, collects kernel metrics

[[inputs.processes]]
  collect_parent_pid = true

[[inputs.swap]]
  fieldpass = ["used", "used_percent", "free", "total"]

[[inputs.system]]
  fieldpass = ["uptime_format", "load1", "load5", "load15"]

# Network monitoring
[[inputs.net]]
  fieldpass = ["bytes_sent", "bytes_recv", "packets_sent", "packets_recv", "err_in", "err_out", "drop_in", "drop_out"]

[[inputs.netstat]]
  fieldpass = ["tcp_listen", "tcp_established", "tcp_close_wait", "tcp_time_wait", "udp_listen"]

# Application monitoring
[[inputs.prometheus]]
  urls = ["http://localhost:9090/metrics"]
  metric_version = 2

[[inputs.http_response]]
  urls = ["http://localhost:8080/health", "http://localhost:3000/api/health"]
  timeout = "5s"
  method = "GET"
  follow_redirects = true
  response_timeout = "5s"
  response_string_match = "ok"

# Docker monitoring
[[inputs.docker]]
  endpoint = "unix:///var/run/docker.sock"
  container_names = []
  timeout = "5s"
  perdevice = true
  total = true
  docker_label_include = []

# 2. InfluxDB 2.x Alerting Configuration
# alert-rules.yaml

apiVersion: 1
groups:
  - name: system_alerts
    rules:
      - alert: HighCPUUsage
        type: query
        query: |
          from(bucket: "monitoring")
            |> range(start: -5m)
            |> filter(fn: (r) => r._measurement == "cpu")
            |> filter(fn: (r) => r._field == "usage_active")
            |> aggregateWindow(every: 1m, fn: mean)
            |> filter(fn: (r) => r._value > 80.0)
        status: OK
        statusRules:
          - current: [OK, Warning, Critical]
            color: "green"
          - current: [OK, Warning]
            color: "yellow"
          - current: [OK, Critical]
            color: "red"
        message: "High CPU usage detected: {{ index .Tags "host" }} has {{ .Values.C }}% CPU usage"
        labels:
          severity: warning
          service: system

      - alert: HighMemoryUsage
        type: query
        query: |
          from(bucket: "monitoring")
            |> range(start: -5m)
            |> filter(fn: (r) => r._measurement == "mem")
            |> filter(fn: (r) => r._field == "used_percent")
            |> aggregateWindow(every: 1m, fn: mean)
            |> filter(fn: (r) => r._value > 85.0)
        status: OK
        statusRules:
          - current: [OK, Warning, Critical]
            color: "green"
          - current: [OK, Warning]
            color: "yellow"
          - current: [OK, Critical]
            color: "red"
        message: "High memory usage on {{ index .Tags "host" }}: {{ .Values.C }}% used"
        labels:
          severity: warning
          service: system

      - alert: DiskSpaceLow
        type: query
        query: |
          from(bucket: "monitoring")
            |> range(start: -5m)
            |> filter(fn: (r) => r._measurement == "disk")
            |> filter(fn: (r) => r._field == "used_percent")
            |> aggregateWindow(every: 1m, fn: mean)
            |> filter(fn: (r) => r._value > 90.0)
        status: OK
        statusRules:
          - current: [OK, Critical]
            color: "red"
        message: "Disk space critically low on {{ index .Tags "host" }}/{{ index .Tags "path" }}: {{ .Values.C }}% used"
        labels:
          severity: critical
          service: system

  - name: application_alerts
    rules:
      - alert: HighErrorRate
        type: query
        query: |
          from(bucket: "monitoring")
            |> range(start: -5m)
            |> filter(fn: (r) => r._measurement == "application_metrics")
            |> filter(fn: (r) => r._field == "error_rate")
            |> filter(fn: (r) => r._value > 5.0)
        status: OK
        statusRules:
          - current: [OK, Warning, Critical]
            color: "green"
          - current: [OK, Warning]
            color: "yellow"
          - current: [OK, Critical]
            color: "red"
        message: "High error rate in {{ index .Tags "service" }}: {{ .Values.C }}% errors"
        labels:
          severity: warning
          service: application

      - alert: HighResponseTime
        type: query
        query: |
          from(bucket: "monitoring")
            |> range(start: -5m)
            |> filter(fn: (r) => r._measurement == "application_metrics")
            |> filter(fn: (r) => r._field == "response_time_p95")
            |> filter(fn: (r) => r._value > 1000.0)
        status: OK
        statusRules:
          - current: [OK, Warning, Critical]
            color: "green"
          - current: [OK, Warning]
            color: "yellow"
          - current: [OK, Critical]
            color: "red"
        message: "High response time in {{ index .Tags "service" }}: {{ .Values.C }}ms P95"
        labels:
          severity: warning
          service: application

      - alert: ServiceDown
        type: threshold
        thresholds:
          - value: 0
            level: critical
        message: "Service {{ index .Tags "url" }} is down"
        labels:
          severity: critical
          service: availability

  - name: custom_threshold_alerts
    rules:
      - alert: TemperatureHigh
        type: query
        query: |
          from(bucket: "monitoring")
            |> range(start: -2m)
            |> filter(fn: (r) => r._measurement == "sensor_data")
            |> filter(fn: (r) => r._field == "temperature")
            |> filter(fn: (r) => r._value > 35.0)
        status: OK
        statusRules:
          - current: [OK, Warning, Critical]
            color: "green"
          - current: [OK, Warning]
            color: "yellow"
          - current: [OK, Critical]
            color: "red"
        message: "High temperature alert: {{ index .Tags "location" }} sensor reports {{ .Values.C }}°C"
        labels:
          severity: warning
          service: environmental

# 3. Notification Endpoints Configuration
# /etc/influxdb2/notifiers.yaml

apiVersion: 1
notifiers:
  - name: SlackNotification
    type: slack
    url: https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK
    username: InfluxDBAlerts
    channel: "#alerts"
    color: critical
    template_path: /etc/influxdb2/templates/slack.tmpl

  - name: EmailNotification
    type: smtp
    username: [email protected]
    password: your-email-password
    host: smtp.gmail.com
    port: 587
    from: [email protected]
    to: "[email protected],[email protected]"
    subject: "InfluxDB Alert: {{ .Title }}"
    template_path: /etc/influxdb2/templates/email.tmpl

  - name: PagerDutyNotification
    type: pagerduty
    serviceKey: your-pagerduty-service-key
    severity: critical
    details:
      firing: "{{ .State }}"
      threshold: "{{ .Thresholds }}"

  - name: WebhookNotification
    type: http
    url: http://your-webhook-endpoint/alerts
    method: POST
    headers:
      Authorization: "Bearer your-token"
      Content-Type: "application/json"
    body: |
      {
        "alert": "{{ .Title }}",
        "message": "{{ .Message }}",
        "severity": "{{ index .Labels "severity" }}",
        "timestamp": "{{ .Time }}",
        "tags": {{ .Tags | toJSON }}
      }

# 4. Notification Templates
# /etc/influxdb2/templates/slack.tmpl

{{ range .Alerts }}
{{ if eq .Status "firing" }}
🚨 **ALERT: {{ .Title }}**
• Message: {{ .Message }}
• Severity: {{ index .Labels "severity" }}
• Time: {{ .Time }}
• Tags: {{ range $key, $value := .Tags }}{{ $key }}={{ $value }} {{ end }}
{{ end }}
{{ if eq .Status "resolved" }}
✅ **RESOLVED: {{ .Title }}**
• Message: {{ .Message }}
• Resolved at: {{ .Time }}
{{ end }}
{{ end }}

# /etc/influxdb2/templates/email.tmpl

Subject: InfluxDB Alert: {{ .Title }}

{{ range .Alerts }}
{{ if eq .Status "firing" }}
ALERT FIRED

Title: {{ .Title }}
Message: {{ .Message }}
Severity: {{ index .Labels "severity" }}
Service: {{ index .Labels "service" }}
Time: {{ .Time }}
Tags:
{{ range $key, $value := .Tags }}
- {{ $key }}: {{ $value }}
{{ end }}

{{ end }}
{{ if eq .Status "resolved" }}
ALERT RESOLVED

Title: {{ .Title }}
Message: {{ .Message }}
Resolved at: {{ .Time }}
Duration: {{ .Duration }}

{{ end }}
{{ end }}

# 5. Grafana Dashboard Configuration
# grafana-dashboard.json (excerpt)

{
  "dashboard": {
    "title": "System Monitoring Dashboard",
    "tags": ["system", "influxdb", "telegraf"],
    "timezone": "browser",
    "panels": [
      {
        "title": "CPU Usage",
        "type": "stat",
        "targets": [
          {
            "expr": "from(bucket: \"monitoring\") |> range(start: v.timeRangeStart, stop: v.timeRangeStop) |> filter(fn: (r) => r._measurement == \"cpu\") |> filter(fn: (r) => r._field == \"usage_active\") |> last()"
          }
        ],
        "fieldConfig": {
          "defaults": {
            "unit": "percent",
            "thresholds": {
              "steps": [
                {"color": "green", "value": null},
                {"color": "yellow", "value": 70},
                {"color": "red", "value": 90}
              ]
            }
          }
        }
      },
      {
        "title": "Memory Usage",
        "type": "graph",
        "targets": [
          {
            "expr": "from(bucket: \"monitoring\") |> range(start: v.timeRangeStart, stop: v.timeRangeStop) |> filter(fn: (r) => r._measurement == \"mem\") |> filter(fn: (r) => r._field == \"used_percent\")"
          }
        ],
        "yAxes": [
          {
            "max": 100,
            "min": 0,
            "unit": "percent"
          }
        ]
      },
      {
        "title": "Disk Usage by Mount Point",
        "type": "piechart",
        "targets": [
          {
            "expr": "from(bucket: \"monitoring\") |> range(start: v.timeRangeStart, stop: v.timeRangeStop) |> filter(fn: (r) => r._measurement == \"disk\") |> filter(fn: (r) => r._field == \"used_percent\") |> last()"
          }
        ]
      }
    ],
    "time": {
      "from": "now-1h",
      "to": "now"
    },
    "refresh": "30s"
  }
}

# 6. Python Alert Manager Integration
# alert_manager.py

import requests
import json
import time
from datetime import datetime
from typing import Dict, List, Optional

class InfluxDBAlertManager:
    def __init__(self, influx_url: str, token: str, org: str):
        self.base_url = influx_url
        self.token = token
        self.org = org
        self.headers = {
            "Authorization": f"Token {token}",
            "Content-Type": "application/json"
        }

    def create_check(self, check_name: str, query: str, every: str, offset: str = "0s"):
        """Create an alert check in InfluxDB"""
        check_data = {
            "name": check_name,
            "query": query,
            "every": every,
            "offset": offset,
            "status": "active",
            "labels": {
                "source": "custom_alerts"
            }
        }

        url = f"{self.base_url}/api/v2/checks"
        response = requests.post(url, headers=self.headers, json=check_data)

        if response.status_code == 201:
            print(f"✅ Alert check '{check_name}' created successfully")
            return response.json()
        else:
            print(f"❌ Failed to create alert check: {response.text}")
            return None

    def get_alert_endpoints(self):
        """Get all notification endpoints"""
        url = f"{self.base_url}/api/v2/notificationEndpoints"
        response = requests.get(url, headers=self.headers)

        if response.status_code == 200:
            return response.json()["notificationEndpoints"]
        else:
            print(f"❌ Failed to get alert endpoints: {response.text}")
            return []

    def create_slack_endpoint(self, name: str, webhook_url: str, channel: str):
        """Create a Slack notification endpoint"""
        endpoint_data = {
            "name": name,
            "type": "slack",
            "url": webhook_url,
            "username": "InfluxDB Alerts",
            "channel": channel
        }

        url = f"{self.base_url}/api/v2/notificationEndpoints"
        response = requests.post(url, headers=self.headers, json=endpoint_data)

        if response.status_code == 201:
            print(f"✅ Slack endpoint '{name}' created successfully")
            return response.json()
        else:
            print(f"❌ Failed to create Slack endpoint: {response.text}")
            return None

    def create_notification_rule(self, rule_name: str, check_id: str, endpoint_id: str):
        """Create a notification rule"""
        rule_data = {
            "name": rule_name,
            "every": "1m",
            "offset": "0s",
            "status": "active",
            "check": {
                "type": "threshold",
                "checkID": check_id
            },
            "endpointID": endpoint_id,
            "statusRules": [
                {
                    "currentLevel": "OK",
                    "previousLevel": "CRITICAL",
                    "notify": True
                }
            ]
        }

        url = f"{self.base_url}/api/v2/notificationRules"
        response = requests.post(url, headers=self.headers, json=rule_data)

        if response.status_code == 201:
            print(f"✅ Notification rule '{rule_name}' created successfully")
            return response.json()
        else:
            print(f"❌ Failed to create notification rule: {response.text}")
            return None

# Usage example
def setup_monitoring():
    """Setup complete monitoring system"""
    alert_manager = InfluxDBAlertManager(
        "http://localhost:8086",
        "your-token",
        "your-org"
    )

    # Create custom checks
    high_cpu_query = '''
    from(bucket: "monitoring")
        |> range(start: -5m)
        |> filter(fn: (r) => r._measurement == "cpu")
        |> filter(fn: (r) => r._field == "usage_active")
        |> filter(fn: (r) => r._value > 80.0)
    '''

    alert_manager.create_check(
        "High CPU Usage",
        high_cpu_query,
        "1m",
        "30s"
    )

    # Create notification endpoints
    slack_endpoint = alert_manager.create_slack_endpoint(
        "DevOps Alerts",
        "https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK",
        "#monitoring"
    )

    if slack_endpoint:
        # Create notification rule
        alert_manager.create_notification_rule(
            "CPU Alert Rule",
            "check-id-here",  # Replace with actual check ID
            slack_endpoint["id"]
        )

if __name__ == "__main__":
    setup_monitoring()

💻 Requêtes Flux InfluxDB flux

🟡 intermediate ⭐⭐⭐⭐

Exemples avancés du langage de requête Flux pour l'analyse complexe de données et les transformations

⏱️ 30 min 🏷️ influxdb, flux, queries, analytics, time series
Prerequisites: Basic InfluxDB operations, Understanding of data analysis concepts
// InfluxDB Flux Query Examples
// Advanced data analysis and transformations

// 1. Basic Query Patterns

// Query recent data from specific measurement
from(bucket: "example-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "sensor_data")
    |> filter(fn: (r) => r._field == "temperature")
    |> sort(columns: ["_time"])
    |> limit(n: 100)

// Query with multiple field filters
from(bucket: "example-bucket")
    |> range(start: -24h)
    |> filter(fn: (r) => r._measurement == "system_metrics")
    |> filter(fn: (r) => r._field == "cpu_usage" or r._field == "memory_usage")
    |> filter(fn: (r) => r.host == "web-server-01")

// 2. Aggregation and Windowing

// Time-based aggregation with mean, min, max
from(bucket: "example-bucket")
    |> range(start: -24h)
    |> filter(fn: (r) => r._measurement == "sensor_data")
    |> filter(fn: (r) => r._field == "temperature")
    |> aggregateWindow(
        every: 1h,
        fn: mean,
        createEmpty: false
    )
    |> yield(name: "hourly_avg")

// Multiple aggregations
from(bucket: "example-bucket")
    |> range(start: -24h)
    |> filter(fn: (r) => r._measurement == "sensor_data")
    |> filter(fn: (r) => r._field == "temperature")
    |> aggregateWindow(
        every: 1h,
        fn: (tables=<-, column="_value") => tables
            |> mean(column: column)
            |> map(fn: (r) => ({r with _field: "temperature_mean"}))
            |> yield(name: "mean"),
        createEmpty: false
    )
    |> aggregateWindow(
        every: 1h,
        fn: (tables=<-, column="_value") => tables
            |> max(column: column)
            |> map(fn: (r) => ({r with _field: "temperature_max"}))
            |> yield(name: "max"),
        createEmpty: false
    )
    |> aggregateWindow(
        every: 1h,
        fn: (tables=<-, column="_value") => tables
            |> min(column: column)
            |> map(fn: (r) => ({r with _field: "temperature_min"}))
            |> yield(name: "min"),
        createEmpty: false
    )

// 3. Data Transformation

// Unit conversion - Celsius to Fahrenheit
from(bucket: "example-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "sensor_data")
    |> filter(fn: (r) => r._field == "temperature")
    |> map(fn: (r) => ({
        r with
        _value: r._value * 9.0 / 5.0 + 32.0,
        _field: "temperature_fahrenheit"
    }))

// Calculate percentage from decimal
from(bucket: "example-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "system_metrics")
    |> filter(fn: (r) => r._field == "cpu_usage")
    |> map(fn: (r) => ({
        r with
        _value: r._value * 100.0,
        _field: "cpu_usage_percent"
    }))

// 4. Filtering and Thresholding

// Filter by value thresholds
from(bucket: "example-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "sensor_data")
    |> filter(fn: (r) => r._field == "temperature")
    |> filter(fn: (r) => r._value > 30.0 or r._value < 10.0)

// Filter by time windows (business hours only)
from(bucket: "example-bucket")
    |> range(start: -24h)
    |> filter(fn: (r) => r._measurement == "application_metrics")
    |> hourSelection(start: 9, stop: 17)

// 5. Joins and Relations

// Join temperature and humidity data
temperature = from(bucket: "example-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "sensor_data")
    |> filter(fn: (r) => r._field == "temperature")
    |> rename(columns: {_value: "temperature"})

humidity = from(bucket: "example-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "sensor_data")
    |> filter(fn: (r) => r._field == "humidity")
    |> rename(columns: {_value: "humidity"})

join(
    tables: {t1: temperature, t2: humidity},
    on: ["_time", "location", "sensor_id"],
    method: "inner"
)

// 6. Mathematical Operations

// Calculate moving average
from(bucket: "example-bucket")
    |> range(start: -24h)
    |> filter(fn: (r) => r._measurement == "sensor_data")
    |> filter(fn: (r) => r._field == "temperature")
    |> movingAverage(n: 10)

// Calculate rate of change (derivative)
from(bucket: "example-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "system_metrics")
    |> filter(fn: (r) => r._field == "memory_usage")
    |> derivative(unit: 1m, nonNegative: true)

// Calculate difference between consecutive points
from(bucket: "example-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "counters")
    |> filter(fn: (r) => r._field == "request_count")
    |> difference(nonNegative: true)

// 7. Statistical Analysis

// Calculate percentiles
from(bucket: "example-bucket")
    |> range(start: -24h)
    |> filter(fn: (r) => r._measurement == "application_metrics")
    |> filter(fn: (r) => r._field == "response_time")
    |> aggregateWindow(
        every: 1h,
        fn: (tables=<-, column="_value") => tables
            |> percentile(column: column, percentile: 0.50)
            |> set(key: "_field", value: "response_time_p50")
            |> yield(name: "p50"),
        createEmpty: false
    )
    |> aggregateWindow(
        every: 1h,
        fn: (tables=<-, column="_value") => tables
            |> percentile(column: column, percentile: 0.95)
            |> set(key: "_field", value: "response_time_p95")
            |> yield(name: "p95"),
        createEmpty: false
    )
    |> aggregateWindow(
        every: 1h,
        fn: (tables=<-, column="_value") => tables
            |> percentile(column: column, percentile: 0.99)
            |> set(key: "_field", value: "response_time_p99")
            |> yield(name: "p99"),
        createEmpty: false
    )

// Calculate standard deviation
from(bucket: "example-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "sensor_data")
    |> filter(fn: (r) => r._field == "temperature")
    |> stddev()

// 8. Time-based Analysis

// Analyze patterns by hour of day
from(bucket: "example-bucket")
    |> range(start: -7d)
    |> filter(fn: (r) => r._measurement == "application_metrics")
    |> filter(fn: (r) => r._field == "active_users")
    |> group(columns: ["_measurement"])
    |> aggregateWindow(
        every: 1h,
        fn: mean,
        createEmpty: false
    )
    |> hour()
    |> group(columns: ["_time"])
    |> mean()

// Day of week analysis
from(bucket: "example-bucket")
    |> range(start: -30d)
    |> filter(fn: (r) => r._measurement == "application_metrics")
    |> filter(fn: (r) => r._field == "throughput")
    |> group(columns: ["_measurement"])
    |> aggregateWindow(
        every: 1d,
        fn: mean,
        createEmpty: false
    )
    |> dayOfWeek()
    |> group(columns: ["_time"])
    |> mean()

// 9. Custom Functions

// Custom function for anomaly detection
anomalies = (table=<-, threshold=2.0) =>
    table
        |> mean(column: "_value")
        |> map(fn: (r) => ({r with _value: r._value}))
        |> stddev()
        |> map(fn: (r) => ({r with upper_bound: r._value + threshold * r._value}))
        |> map(fn: (r) => ({r with lower_bound: r._value - threshold * r._value}))

// Use custom function
from(bucket: "example-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "sensor_data")
    |> filter(fn: (r) => r._field == "temperature")
    |> anomalies(threshold: 2.0)

// 10. Performance Monitoring Queries

// Calculate SLA compliance
from(bucket: "example-bucket")
    |> range(start: -24h)
    |> filter(fn: (r) => r._measurement == "application_metrics")
    |> filter(fn: (r) => r._field == "response_time")
    |> map(fn: (r) => ({
        r with
        _value: if r._value <= 500 then 1.0 else 0.0,
        _field: "sla_compliant"
    }))
    |> aggregateWindow(
        every: 1h,
        fn: mean,
        createEmpty: false
    )
    |> map(fn: (r) => ({
        r with
        _value: r._value * 100.0,
        _field: "sla_compliance_percent"
    }))

// Error rate calculation
total_requests = from(bucket: "example-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "application_metrics")
    |> filter(fn: (r) => r._field == "request_count")
    |> sum()

error_requests = from(bucket: "example-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "application_metrics")
    |> filter(fn: (r) => r._field == "error_count")
    |> sum()

join(
    tables: {t1: total_requests, t2: error_requests},
    on: ["_time"],
    method: "inner"
)
    |> map(fn: (r) => ({
        r with
        _value: r._value_t2 / r._value_t1 * 100.0,
        _field: "error_rate_percent"
    }))

// 11. Capacity Planning

// Growth rate calculation
from(bucket: "example-bucket")
    |> range(start: -30d)
    |> filter(fn: (r) => r._measurement == "storage_metrics")
    |> filter(fn: (r) => r._field == "disk_usage")
    |> aggregateWindow(
        every: 1d,
        fn: mean,
        createEmpty: false
    )
    |> derivative(unit: 1d)

// Predict future usage
from(bucket: "example-bucket")
    |> range(start: -7d)
    |> filter(fn: (r) => r._measurement == "application_metrics")
    |> filter(fn: (r) => r._field == "active_users")
    |> linearInterpolate(every: 1h)
    |> extrapolate(
        every: 1h,
        duration: 24h
    )

// 12. Complex Data Analysis

// Correlation analysis between metrics
cpu_data = from(bucket: "example-bucket")
    |> range(start: -24h)
    |> filter(fn: (r) => r._measurement == "system_metrics")
    |> filter(fn: (r) => r._field == "cpu_usage")
    |> aggregateWindow(every: 5m, fn: mean, createEmpty: false)

memory_data = from(bucket: "example-bucket")
    |> range(start: -24h)
    |> filter(fn: (r) => r._measurement == "system_metrics")
    |> filter(fn: (r) => r._field == "memory_usage")
    |> aggregateWindow(every: 5m, fn: mean, createEmpty: false)

join(
    tables: {cpu: cpu_data, memory: memory_data},
    on: ["_time"],
    method: "inner"
)
    |> map(fn: (r) => ({
        r with
        cpu_memory_ratio: r._value_cpu / r._value_memory
    }))

// Heatmap data preparation
from(bucket: "example-bucket")
    |> range(start: -7d)
    |> filter(fn: (r) => r._measurement == "sensor_data")
    |> filter(fn: (r) => r._field == "temperature")
    |> hour()
    |> dayOfWeek()
    |> group(columns: ["_time", "location"])
    |> mean()
    |> sort(columns: ["_time"])