InfluxDB Database Samples

InfluxDB time series database examples including data modeling, queries, and monitoring

Key Facts

Category
Database
Items
3
Format Families
yaml

Sample Overview

InfluxDB time series database examples including data modeling, queries, and monitoring This sample set belongs to Database and can be used to test related workflows inside Elysia Tools.

💻 InfluxDB Basic Operations python

🟢 simple ⭐⭐

Fundamental operations with InfluxDB including data writing, reading, and basic queries

⏱️ 20 min 🏷️ influxdb, time series, iot, monitoring, python
Prerequisites: Python programming, Basic understanding of time series data
# InfluxDB Basic Operations
# Python - Using influxdb-client library

from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS
import datetime
import time
import random

# Configuration
url = "http://localhost:8086"
token = "your-token-here"
org = "your-org"
bucket = "example-bucket"

# Initialize client
class InfluxDBManager:
    def __init__(self, url, token, org):
        self.url = url
        self.token = token
        self.org = org
        self.client = InfluxDBClient(url=url, token=token, org=org)
        self.write_api = self.client.write_api(write_options=SYNCHRONOUS)
        self.query_api = self.client.query_api()

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.client.close()

    # 1. Writing Data Points
    def write_single_point(self, measurement, tags, fields, timestamp=None):
        """Write a single data point"""
        point = Point(measurement)

        # Add tags
        for key, value in tags.items():
            point.tag(key, value)

        # Add fields
        for key, value in fields.items():
            point.field(key, value)

        # Set timestamp
        if timestamp:
            point.time(timestamp, WritePrecision.NS)

        try:
            self.write_api.write(bucket=bucket, record=point)
            print(f"✅ Written point to {measurement}")
        except Exception as e:
            print(f"❌ Error writing point: {e}")

    def write_multiple_points(self, points):
        """Write multiple data points at once"""
        try:
            self.write_api.write(bucket=bucket, record=points)
            print(f"✅ Written {len(points)} points")
        except Exception as e:
            print(f"❌ Error writing points: {e}")

    # 2. Data Modeling Examples
    def write_sensor_data(self):
        """Write IoT sensor data"""
        # Temperature sensor readings
        for i in range(10):
            temperature = 20 + random.uniform(-5, 15)
            humidity = 60 + random.uniform(-20, 30)
            timestamp = datetime.datetime.now() - datetime.timedelta(minutes=i*5)

            point = Point("environmental")                 .tag("location", "office")                 .tag("sensor_id", "temp_001")                 .tag("room", "conference")                 .field("temperature", temperature)                 .field("humidity", humidity)                 .time(timestamp, WritePrecision.NS)

            self.write_api.write(bucket=bucket, record=point)

        # CPU and memory metrics
        for i in range(20):
            cpu_usage = random.uniform(20, 90)
            memory_usage = random.uniform(30, 85)
            timestamp = datetime.datetime.now() - datetime.timedelta(minutes=i)

            point = Point("system_metrics")                 .tag("host", "web-server-01")                 .tag("region", "us-west-1")                 .tag("environment", "production")                 .field("cpu_usage", cpu_usage)                 .field("memory_usage", memory_usage)                 .field("disk_usage", random.uniform(40, 70))                 .time(timestamp, WritePrecision.NS)

            self.write_api.write(bucket=bucket, record=point)

    def write_application_metrics(self):
        """Write application performance metrics"""
        for i in range(15):
            response_time = random.uniform(100, 2000)
            error_count = random.choices([0, 1, 2, 3], weights=[70, 20, 8, 2])[0]
            active_users = random.randint(100, 500)

            timestamp = datetime.datetime.now() - datetime.timedelta(minutes=i*2)

            point = Point("application_metrics")                 .tag("service", "user-service")                 .tag("endpoint", "/api/users")                 .tag("version", "v1.2.3")                 .field("response_time", response_time)                 .field("error_count", error_count)                 .field("active_users", active_users)                 .field("throughput", random.uniform(50, 200))                 .time(timestamp, WritePrecision.NS)

            self.write_api.write(bucket=bucket, record=point)

    # 3. Querying Data
    def query_recent_data(self, measurement, duration="1h"):
        """Query recent data from a measurement"""
        query = f'''
        from(bucket: "{bucket}")
            |> range(start: -{duration})
            |> filter(fn: (r) => r._measurement == "{measurement}")
            |> limit(n: 10)
        '''

        try:
            result = self.query_api.query(query, org=self.org)
            records = []

            for table in result:
                for record in table.records:
                    records.append({
                        'time': record.get_time(),
                        'field': record.get_field(),
                        'value': record.get_value(),
                        'measurement': record.get_measurement(),
                        'tags': record.values
                    })

            print(f"📊 Found {len(records)} records from {measurement}")
            return records
        except Exception as e:
            print(f"❌ Error querying data: {e}")
            return []

    def query_by_tags(self, measurement, tag_filters, duration="24h"):
        """Query data with specific tag filters"""
        tag_conditions = []
        for key, value in tag_filters.items():
            tag_conditions.append(f'r.{key} == "{value}"')

        tag_filter = ' and '.join(tag_conditions)

        query = f'''
        from(bucket: "{bucket}")
            |> range(start: -{duration})
            |> filter(fn: (r) => r._measurement == "{measurement}")
            |> filter(fn: (r) => {tag_filter})
            |> sort(columns: ["_time"])
        '''

        try:
            result = self.query_api.query(query, org=self.org)
            return self._process_query_result(result)
        except Exception as e:
            print(f"❌ Error querying by tags: {e}")
            return []

    def query_aggregated_data(self, measurement, field, aggregation="mean", window="5m", duration="1h"):
        """Query aggregated data"""
        query = f'''
        from(bucket: "{bucket}")
            |> range(start: -{duration})
            |> filter(fn: (r) => r._measurement == "{measurement}")
            |> filter(fn: (r) => r._field == "{field}")
            |> aggregateWindow(every: {window}, fn: {aggregation}, createEmpty: false)
            |> yield(name: "{aggregation}")
        '''

        try:
            result = self.query_api.query(query, org=self.org)
            return self._process_query_result(result)
        except Exception as e:
            print(f"❌ Error querying aggregated data: {e}")
            return []

    def _process_query_result(self, result):
        """Process query result into list of dictionaries"""
        records = []
        for table in result:
            for record in table.records:
                records.append({
                    'time': record.get_time(),
                    'field': record.get_field(),
                    'value': record.get_value(),
                    'measurement': record.get_measurement(),
                    'tags': {k: v for k, v in record.values.items() if not k.startswith('_')}
                })
        return records

    # 4. Advanced Queries
    def query_downsampled_data(self, measurement, duration="24h", window="1h"):
        """Query downsampled data with multiple aggregations"""
        query = f'''
        from(bucket: "{bucket}")
            |> range(start: -{duration})
            |> filter(fn: (r) => r._measurement == "{measurement}")
            |> aggregateWindow(
                every: {window},
                fn: (tables=<-, column="_value") =>
                    tables
                        |> mean(column: column)
                        |> set(key: "_field", value: "mean")
                        |> yield(name: "mean"),
                createEmpty: false
            )
        '''

        try:
            result = self.query_api.query(query, org=self.org)
            return self._process_query_result(result)
        except Exception as e:
            print(f"❌ Error querying downsampled data: {e}")
            return []

    def query_with_transformations(self):
        """Query with data transformations"""
        query = '''
        from(bucket: "example-bucket")
            |> range(start: -1h)
            |> filter(fn: (r) => r._measurement == "system_metrics")
            |> filter(fn: (r) => r._field == "cpu_usage")
            |> map(fn: (r) => ({ r with _value: r._value / 100.0 }))
            |> filter(fn: (r) => r._value > 0.8)
            |> sort(columns: ["_value"], desc: true)
            |> limit(n: 5)
        '''

        try:
            result = self.query_api.query(query, org=self.org)
            return self._process_query_result(result)
        except Exception as e:
            print(f"❌ Error with transformed query: {e}")
            return []

    # 5. Batch Operations
    def batch_write_sensor_data(self, num_points=100):
        """Write multiple sensor data points in batch"""
        points = []
        locations = ["office", "warehouse", "factory", "datacenter"]

        for i in range(num_points):
            location = random.choice(locations)
            temperature = 20 + random.uniform(-10, 20)
            humidity = 50 + random.uniform(-30, 40)
            pressure = 1000 + random.uniform(-50, 50)

            point = Point("environmental_sensors")                 .tag("location", location)                 .tag("sensor_type", "weather_station")                 .field("temperature", temperature)                 .field("humidity", humidity)                 .field("pressure", pressure)                 .time(datetime.datetime.now() - datetime.timedelta(seconds=i*30), WritePrecision.NS)

            points.append(point)

        self.write_multiple_points(points)
        return points

# Usage Examples
def demonstrate_basic_operations():
    """Demonstrate basic InfluxDB operations"""

    with InfluxDBManager(url, token, org) as db:
        print("🚀 Starting InfluxDB Basic Operations Demo")

        # 1. Write single data points
        print("\n📝 Writing single data points...")
        db.write_single_point(
            measurement="test_data",
            tags={"source": "demo", "version": "1.0"},
            fields={"value": 42.5, "status": "ok"}
        )

        # 2. Write sensor data
        print("\n🌡️ Writing sensor data...")
        db.write_sensor_data()

        # 3. Write application metrics
        print("\n📊 Writing application metrics...")
        db.write_application_metrics()

        # 4. Query recent data
        print("\n🔍 Querying recent data...")
        recent_data = db.query_recent_data("environmental", "30m")
        for record in recent_data[:5]:
            print(f"  {record['time']}: {record['field']} = {record['value']}")

        # 5. Query with tag filters
        print("\n🏷️ Querying with tag filters...")
        filtered_data = db.query_by_tags(
            measurement="system_metrics",
            tag_filters={"host": "web-server-01", "environment": "production"},
            duration="1h"
        )
        print(f"Found {len(filtered_data)} records with specified tags")

        # 6. Query aggregated data
        print("\n📈 Querying aggregated data...")
        avg_cpu = db.query_aggregated_data(
            measurement="system_metrics",
            field="cpu_usage",
            aggregation="mean",
            window="10m",
            duration="1h"
        )
        print(f"Average CPU usage points: {len(avg_cpu)}")

        # 7. Batch write
        print("\n💾 Batch writing sensor data...")
        batch_points = db.batch_write_sensor_data(50)
        print(f"Batch written {len(batch_points)} points")

# 6. Data Exploration
class InfluxDBExplorer:
    def __init__(self, url, token, org):
        self.client = InfluxDBClient(url=url, token=token, org=org)
        self.query_api = self.client.query_api()

    def list_measurements(self, bucket="example-bucket"):
        """List all measurements in a bucket"""
        query = f'''
        import "influxdata/influxdb/schema"
        schema.measurements(bucket: "{bucket}")
        '''

        try:
            result = self.query_api.query(query, org=self.org)
            measurements = [record.get_value() for table in result for record in table.records]
            print(f"📋 Measurements in bucket '{bucket}': {measurements}")
            return measurements
        except Exception as e:
            print(f"❌ Error listing measurements: {e}")
            return []

    def show_tag_keys(self, measurement, bucket="example-bucket"):
        """Show all tag keys for a measurement"""
        query = f'''
        import "influxdata/influxdb/schema"
        schema.tagKeys(bucket: "{bucket}", predicate: (r) => r._measurement == "{measurement}")
        '''

        try:
            result = self.query_api.query(query, org=self.org)
            tag_keys = [record.get_value() for table in result for record in table.records]
            print(f"🏷️ Tag keys for '{measurement}': {tag_keys}")
            return tag_keys
        except Exception as e:
            print(f"❌ Error showing tag keys: {e}")
            return []

    def show_field_keys(self, measurement, bucket="example-bucket"):
        """Show all field keys for a measurement"""
        query = f'''
        import "influxdata/influxdb/schema"
        schema.fieldKeys(bucket: "{bucket}", predicate: (r) => r._measurement == "{measurement}")
        '''

        try:
            result = self.query_api.query(query, org=self.org)
            field_keys = [record.get_value() for table in result for record in table.records]
            print(f"🔢 Field keys for '{measurement}': {field_keys}")
            return field_keys
        except Exception as e:
            print(f"❌ Error showing field keys: {e}")
            return []

    def get_bucket_stats(self, bucket="example-bucket"):
        """Get basic statistics for a bucket"""
        query = f'''
        from(bucket: "{bucket}")
            |> range(start: -24h)
            |> group(columns: ["_measurement"])
            |> count()
        '''

        try:
            result = self.query_api.query(query, org=self.org)
            stats = {}
            for table in result:
                measurement = table.records[0].values.get("_measurement")
                count = table.records[0].get_value()
                stats[measurement] = count

            print(f"📊 Bucket statistics for '{bucket}':")
            for measurement, count in stats.items():
                print(f"  {measurement}: {count} points")

            return stats
        except Exception as e:
            print(f"❌ Error getting bucket stats: {e}")
            return {}

def demonstrate_exploration():
    """Demonstrate data exploration capabilities"""
    with InfluxDBExplorer(url, token, org) as explorer:
        print("\n🔍 Exploring InfluxDB Data Structure")

        measurements = explorer.list_measurements()
        if measurements:
            for measurement in measurements[:3]:  # Show first 3
                explorer.show_tag_keys(measurement)
                explorer.show_field_keys(measurement)

        explorer.get_bucket_stats()

if __name__ == "__main__":
    # Run the demonstrations
    demonstrate_basic_operations()
    demonstrate_exploration()

💻 InfluxDB Monitoring and Alerts yaml

🟢 simple ⭐⭐⭐⭐

Setting up monitoring dashboards, alerting rules, and notification systems with InfluxDB

⏱️ 35 min 🏷️ influxdb, monitoring, alerting, devops, telegraf
Prerequisites: Advanced InfluxDB operations, System administration, Monitoring concepts
# InfluxDB Monitoring and Alerting
# Configuration files and examples for setting up monitoring systems

# 1. Telegraf Configuration for System Monitoring
# /etc/telegraf/telegraf.conf

[agent]
  interval = "10s"
  round_interval = true
  metric_batch_size = 1000
  metric_buffer_limit = 10000
  collection_jitter = "0s"
  flush_interval = "10s"
  flush_jitter = "0s"

# Output to InfluxDB
[[outputs.influxdb_v2]]
  urls = ["http://localhost:8086"]
  token = "your-auth-token"
  organization = "your-org"
  bucket = "monitoring"

# System input plugins
[[inputs.cpu]]
  percpu = true
  totalcpu = true
  collect_cpu_time = false
  report_active = false
  core_tags = true

[[inputs.mem]]
  fieldpass = ["available", "available_percent", "buffered", "cached", "free", "total", "used", "used_percent"]

[[inputs.disk]]
  mount_points = ["/"]
  ignore_fs = ["tmpfs", "devtmpfs", "devfs", "overlay", "aufs", "squashfs"]

[[inputs.diskio]]

[[inputs.kernel]]
  # No configuration needed, collects kernel metrics

[[inputs.processes]]
  collect_parent_pid = true

[[inputs.swap]]
  fieldpass = ["used", "used_percent", "free", "total"]

[[inputs.system]]
  fieldpass = ["uptime_format", "load1", "load5", "load15"]

# Network monitoring
[[inputs.net]]
  fieldpass = ["bytes_sent", "bytes_recv", "packets_sent", "packets_recv", "err_in", "err_out", "drop_in", "drop_out"]

[[inputs.netstat]]
  fieldpass = ["tcp_listen", "tcp_established", "tcp_close_wait", "tcp_time_wait", "udp_listen"]

# Application monitoring
[[inputs.prometheus]]
  urls = ["http://localhost:9090/metrics"]
  metric_version = 2

[[inputs.http_response]]
  urls = ["http://localhost:8080/health", "http://localhost:3000/api/health"]
  timeout = "5s"
  method = "GET"
  follow_redirects = true
  response_timeout = "5s"
  response_string_match = "ok"

# Docker monitoring
[[inputs.docker]]
  endpoint = "unix:///var/run/docker.sock"
  container_names = []
  timeout = "5s"
  perdevice = true
  total = true
  docker_label_include = []

# 2. InfluxDB 2.x Alerting Configuration
# alert-rules.yaml

apiVersion: 1
groups:
  - name: system_alerts
    rules:
      - alert: HighCPUUsage
        type: query
        query: |
          from(bucket: "monitoring")
            |> range(start: -5m)
            |> filter(fn: (r) => r._measurement == "cpu")
            |> filter(fn: (r) => r._field == "usage_active")
            |> aggregateWindow(every: 1m, fn: mean)
            |> filter(fn: (r) => r._value > 80.0)
        status: OK
        statusRules:
          - current: [OK, Warning, Critical]
            color: "green"
          - current: [OK, Warning]
            color: "yellow"
          - current: [OK, Critical]
            color: "red"
        message: "High CPU usage detected: {{ index .Tags "host" }} has {{ .Values.C }}% CPU usage"
        labels:
          severity: warning
          service: system

      - alert: HighMemoryUsage
        type: query
        query: |
          from(bucket: "monitoring")
            |> range(start: -5m)
            |> filter(fn: (r) => r._measurement == "mem")
            |> filter(fn: (r) => r._field == "used_percent")
            |> aggregateWindow(every: 1m, fn: mean)
            |> filter(fn: (r) => r._value > 85.0)
        status: OK
        statusRules:
          - current: [OK, Warning, Critical]
            color: "green"
          - current: [OK, Warning]
            color: "yellow"
          - current: [OK, Critical]
            color: "red"
        message: "High memory usage on {{ index .Tags "host" }}: {{ .Values.C }}% used"
        labels:
          severity: warning
          service: system

      - alert: DiskSpaceLow
        type: query
        query: |
          from(bucket: "monitoring")
            |> range(start: -5m)
            |> filter(fn: (r) => r._measurement == "disk")
            |> filter(fn: (r) => r._field == "used_percent")
            |> aggregateWindow(every: 1m, fn: mean)
            |> filter(fn: (r) => r._value > 90.0)
        status: OK
        statusRules:
          - current: [OK, Critical]
            color: "red"
        message: "Disk space critically low on {{ index .Tags "host" }}/{{ index .Tags "path" }}: {{ .Values.C }}% used"
        labels:
          severity: critical
          service: system

  - name: application_alerts
    rules:
      - alert: HighErrorRate
        type: query
        query: |
          from(bucket: "monitoring")
            |> range(start: -5m)
            |> filter(fn: (r) => r._measurement == "application_metrics")
            |> filter(fn: (r) => r._field == "error_rate")
            |> filter(fn: (r) => r._value > 5.0)
        status: OK
        statusRules:
          - current: [OK, Warning, Critical]
            color: "green"
          - current: [OK, Warning]
            color: "yellow"
          - current: [OK, Critical]
            color: "red"
        message: "High error rate in {{ index .Tags "service" }}: {{ .Values.C }}% errors"
        labels:
          severity: warning
          service: application

      - alert: HighResponseTime
        type: query
        query: |
          from(bucket: "monitoring")
            |> range(start: -5m)
            |> filter(fn: (r) => r._measurement == "application_metrics")
            |> filter(fn: (r) => r._field == "response_time_p95")
            |> filter(fn: (r) => r._value > 1000.0)
        status: OK
        statusRules:
          - current: [OK, Warning, Critical]
            color: "green"
          - current: [OK, Warning]
            color: "yellow"
          - current: [OK, Critical]
            color: "red"
        message: "High response time in {{ index .Tags "service" }}: {{ .Values.C }}ms P95"
        labels:
          severity: warning
          service: application

      - alert: ServiceDown
        type: threshold
        thresholds:
          - value: 0
            level: critical
        message: "Service {{ index .Tags "url" }} is down"
        labels:
          severity: critical
          service: availability

  - name: custom_threshold_alerts
    rules:
      - alert: TemperatureHigh
        type: query
        query: |
          from(bucket: "monitoring")
            |> range(start: -2m)
            |> filter(fn: (r) => r._measurement == "sensor_data")
            |> filter(fn: (r) => r._field == "temperature")
            |> filter(fn: (r) => r._value > 35.0)
        status: OK
        statusRules:
          - current: [OK, Warning, Critical]
            color: "green"
          - current: [OK, Warning]
            color: "yellow"
          - current: [OK, Critical]
            color: "red"
        message: "High temperature alert: {{ index .Tags "location" }} sensor reports {{ .Values.C }}°C"
        labels:
          severity: warning
          service: environmental

# 3. Notification Endpoints Configuration
# /etc/influxdb2/notifiers.yaml

apiVersion: 1
notifiers:
  - name: SlackNotification
    type: slack
    url: https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK
    username: InfluxDBAlerts
    channel: "#alerts"
    color: critical
    template_path: /etc/influxdb2/templates/slack.tmpl

  - name: EmailNotification
    type: smtp
    username: [email protected]
    password: your-email-password
    host: smtp.gmail.com
    port: 587
    from: [email protected]
    to: "[email protected],[email protected]"
    subject: "InfluxDB Alert: {{ .Title }}"
    template_path: /etc/influxdb2/templates/email.tmpl

  - name: PagerDutyNotification
    type: pagerduty
    serviceKey: your-pagerduty-service-key
    severity: critical
    details:
      firing: "{{ .State }}"
      threshold: "{{ .Thresholds }}"

  - name: WebhookNotification
    type: http
    url: http://your-webhook-endpoint/alerts
    method: POST
    headers:
      Authorization: "Bearer your-token"
      Content-Type: "application/json"
    body: |
      {
        "alert": "{{ .Title }}",
        "message": "{{ .Message }}",
        "severity": "{{ index .Labels "severity" }}",
        "timestamp": "{{ .Time }}",
        "tags": {{ .Tags | toJSON }}
      }

# 4. Notification Templates
# /etc/influxdb2/templates/slack.tmpl

{{ range .Alerts }}
{{ if eq .Status "firing" }}
🚨 **ALERT: {{ .Title }}**
• Message: {{ .Message }}
• Severity: {{ index .Labels "severity" }}
• Time: {{ .Time }}
• Tags: {{ range $key, $value := .Tags }}{{ $key }}={{ $value }} {{ end }}
{{ end }}
{{ if eq .Status "resolved" }}
✅ **RESOLVED: {{ .Title }}**
• Message: {{ .Message }}
• Resolved at: {{ .Time }}
{{ end }}
{{ end }}

# /etc/influxdb2/templates/email.tmpl

Subject: InfluxDB Alert: {{ .Title }}

{{ range .Alerts }}
{{ if eq .Status "firing" }}
ALERT FIRED

Title: {{ .Title }}
Message: {{ .Message }}
Severity: {{ index .Labels "severity" }}
Service: {{ index .Labels "service" }}
Time: {{ .Time }}
Tags:
{{ range $key, $value := .Tags }}
- {{ $key }}: {{ $value }}
{{ end }}

{{ end }}
{{ if eq .Status "resolved" }}
ALERT RESOLVED

Title: {{ .Title }}
Message: {{ .Message }}
Resolved at: {{ .Time }}
Duration: {{ .Duration }}

{{ end }}
{{ end }}

# 5. Grafana Dashboard Configuration
# grafana-dashboard.json (excerpt)

{
  "dashboard": {
    "title": "System Monitoring Dashboard",
    "tags": ["system", "influxdb", "telegraf"],
    "timezone": "browser",
    "panels": [
      {
        "title": "CPU Usage",
        "type": "stat",
        "targets": [
          {
            "expr": "from(bucket: \"monitoring\") |> range(start: v.timeRangeStart, stop: v.timeRangeStop) |> filter(fn: (r) => r._measurement == \"cpu\") |> filter(fn: (r) => r._field == \"usage_active\") |> last()"
          }
        ],
        "fieldConfig": {
          "defaults": {
            "unit": "percent",
            "thresholds": {
              "steps": [
                {"color": "green", "value": null},
                {"color": "yellow", "value": 70},
                {"color": "red", "value": 90}
              ]
            }
          }
        }
      },
      {
        "title": "Memory Usage",
        "type": "graph",
        "targets": [
          {
            "expr": "from(bucket: \"monitoring\") |> range(start: v.timeRangeStart, stop: v.timeRangeStop) |> filter(fn: (r) => r._measurement == \"mem\") |> filter(fn: (r) => r._field == \"used_percent\")"
          }
        ],
        "yAxes": [
          {
            "max": 100,
            "min": 0,
            "unit": "percent"
          }
        ]
      },
      {
        "title": "Disk Usage by Mount Point",
        "type": "piechart",
        "targets": [
          {
            "expr": "from(bucket: \"monitoring\") |> range(start: v.timeRangeStart, stop: v.timeRangeStop) |> filter(fn: (r) => r._measurement == \"disk\") |> filter(fn: (r) => r._field == \"used_percent\") |> last()"
          }
        ]
      }
    ],
    "time": {
      "from": "now-1h",
      "to": "now"
    },
    "refresh": "30s"
  }
}

# 6. Python Alert Manager Integration
# alert_manager.py

import requests
import json
import time
from datetime import datetime
from typing import Dict, List, Optional

class InfluxDBAlertManager:
    def __init__(self, influx_url: str, token: str, org: str):
        self.base_url = influx_url
        self.token = token
        self.org = org
        self.headers = {
            "Authorization": f"Token {token}",
            "Content-Type": "application/json"
        }

    def create_check(self, check_name: str, query: str, every: str, offset: str = "0s"):
        """Create an alert check in InfluxDB"""
        check_data = {
            "name": check_name,
            "query": query,
            "every": every,
            "offset": offset,
            "status": "active",
            "labels": {
                "source": "custom_alerts"
            }
        }

        url = f"{self.base_url}/api/v2/checks"
        response = requests.post(url, headers=self.headers, json=check_data)

        if response.status_code == 201:
            print(f"✅ Alert check '{check_name}' created successfully")
            return response.json()
        else:
            print(f"❌ Failed to create alert check: {response.text}")
            return None

    def get_alert_endpoints(self):
        """Get all notification endpoints"""
        url = f"{self.base_url}/api/v2/notificationEndpoints"
        response = requests.get(url, headers=self.headers)

        if response.status_code == 200:
            return response.json()["notificationEndpoints"]
        else:
            print(f"❌ Failed to get alert endpoints: {response.text}")
            return []

    def create_slack_endpoint(self, name: str, webhook_url: str, channel: str):
        """Create a Slack notification endpoint"""
        endpoint_data = {
            "name": name,
            "type": "slack",
            "url": webhook_url,
            "username": "InfluxDB Alerts",
            "channel": channel
        }

        url = f"{self.base_url}/api/v2/notificationEndpoints"
        response = requests.post(url, headers=self.headers, json=endpoint_data)

        if response.status_code == 201:
            print(f"✅ Slack endpoint '{name}' created successfully")
            return response.json()
        else:
            print(f"❌ Failed to create Slack endpoint: {response.text}")
            return None

    def create_notification_rule(self, rule_name: str, check_id: str, endpoint_id: str):
        """Create a notification rule"""
        rule_data = {
            "name": rule_name,
            "every": "1m",
            "offset": "0s",
            "status": "active",
            "check": {
                "type": "threshold",
                "checkID": check_id
            },
            "endpointID": endpoint_id,
            "statusRules": [
                {
                    "currentLevel": "OK",
                    "previousLevel": "CRITICAL",
                    "notify": True
                }
            ]
        }

        url = f"{self.base_url}/api/v2/notificationRules"
        response = requests.post(url, headers=self.headers, json=rule_data)

        if response.status_code == 201:
            print(f"✅ Notification rule '{rule_name}' created successfully")
            return response.json()
        else:
            print(f"❌ Failed to create notification rule: {response.text}")
            return None

# Usage example
def setup_monitoring():
    """Setup complete monitoring system"""
    alert_manager = InfluxDBAlertManager(
        "http://localhost:8086",
        "your-token",
        "your-org"
    )

    # Create custom checks
    high_cpu_query = '''
    from(bucket: "monitoring")
        |> range(start: -5m)
        |> filter(fn: (r) => r._measurement == "cpu")
        |> filter(fn: (r) => r._field == "usage_active")
        |> filter(fn: (r) => r._value > 80.0)
    '''

    alert_manager.create_check(
        "High CPU Usage",
        high_cpu_query,
        "1m",
        "30s"
    )

    # Create notification endpoints
    slack_endpoint = alert_manager.create_slack_endpoint(
        "DevOps Alerts",
        "https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK",
        "#monitoring"
    )

    if slack_endpoint:
        # Create notification rule
        alert_manager.create_notification_rule(
            "CPU Alert Rule",
            "check-id-here",  # Replace with actual check ID
            slack_endpoint["id"]
        )

if __name__ == "__main__":
    setup_monitoring()

💻 InfluxDB Flux Queries flux

🟡 intermediate ⭐⭐⭐⭐

Advanced Flux query language examples for complex data analysis and transformations

⏱️ 30 min 🏷️ influxdb, flux, queries, analytics, time series
Prerequisites: Basic InfluxDB operations, Understanding of data analysis concepts
// InfluxDB Flux Query Examples
// Advanced data analysis and transformations

// 1. Basic Query Patterns

// Query recent data from specific measurement
from(bucket: "example-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "sensor_data")
    |> filter(fn: (r) => r._field == "temperature")
    |> sort(columns: ["_time"])
    |> limit(n: 100)

// Query with multiple field filters
from(bucket: "example-bucket")
    |> range(start: -24h)
    |> filter(fn: (r) => r._measurement == "system_metrics")
    |> filter(fn: (r) => r._field == "cpu_usage" or r._field == "memory_usage")
    |> filter(fn: (r) => r.host == "web-server-01")

// 2. Aggregation and Windowing

// Time-based aggregation with mean, min, max
from(bucket: "example-bucket")
    |> range(start: -24h)
    |> filter(fn: (r) => r._measurement == "sensor_data")
    |> filter(fn: (r) => r._field == "temperature")
    |> aggregateWindow(
        every: 1h,
        fn: mean,
        createEmpty: false
    )
    |> yield(name: "hourly_avg")

// Multiple aggregations
from(bucket: "example-bucket")
    |> range(start: -24h)
    |> filter(fn: (r) => r._measurement == "sensor_data")
    |> filter(fn: (r) => r._field == "temperature")
    |> aggregateWindow(
        every: 1h,
        fn: (tables=<-, column="_value") => tables
            |> mean(column: column)
            |> map(fn: (r) => ({r with _field: "temperature_mean"}))
            |> yield(name: "mean"),
        createEmpty: false
    )
    |> aggregateWindow(
        every: 1h,
        fn: (tables=<-, column="_value") => tables
            |> max(column: column)
            |> map(fn: (r) => ({r with _field: "temperature_max"}))
            |> yield(name: "max"),
        createEmpty: false
    )
    |> aggregateWindow(
        every: 1h,
        fn: (tables=<-, column="_value") => tables
            |> min(column: column)
            |> map(fn: (r) => ({r with _field: "temperature_min"}))
            |> yield(name: "min"),
        createEmpty: false
    )

// 3. Data Transformation

// Unit conversion - Celsius to Fahrenheit
from(bucket: "example-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "sensor_data")
    |> filter(fn: (r) => r._field == "temperature")
    |> map(fn: (r) => ({
        r with
        _value: r._value * 9.0 / 5.0 + 32.0,
        _field: "temperature_fahrenheit"
    }))

// Calculate percentage from decimal
from(bucket: "example-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "system_metrics")
    |> filter(fn: (r) => r._field == "cpu_usage")
    |> map(fn: (r) => ({
        r with
        _value: r._value * 100.0,
        _field: "cpu_usage_percent"
    }))

// 4. Filtering and Thresholding

// Filter by value thresholds
from(bucket: "example-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "sensor_data")
    |> filter(fn: (r) => r._field == "temperature")
    |> filter(fn: (r) => r._value > 30.0 or r._value < 10.0)

// Filter by time windows (business hours only)
from(bucket: "example-bucket")
    |> range(start: -24h)
    |> filter(fn: (r) => r._measurement == "application_metrics")
    |> hourSelection(start: 9, stop: 17)

// 5. Joins and Relations

// Join temperature and humidity data
temperature = from(bucket: "example-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "sensor_data")
    |> filter(fn: (r) => r._field == "temperature")
    |> rename(columns: {_value: "temperature"})

humidity = from(bucket: "example-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "sensor_data")
    |> filter(fn: (r) => r._field == "humidity")
    |> rename(columns: {_value: "humidity"})

join(
    tables: {t1: temperature, t2: humidity},
    on: ["_time", "location", "sensor_id"],
    method: "inner"
)

// 6. Mathematical Operations

// Calculate moving average
from(bucket: "example-bucket")
    |> range(start: -24h)
    |> filter(fn: (r) => r._measurement == "sensor_data")
    |> filter(fn: (r) => r._field == "temperature")
    |> movingAverage(n: 10)

// Calculate rate of change (derivative)
from(bucket: "example-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "system_metrics")
    |> filter(fn: (r) => r._field == "memory_usage")
    |> derivative(unit: 1m, nonNegative: true)

// Calculate difference between consecutive points
from(bucket: "example-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "counters")
    |> filter(fn: (r) => r._field == "request_count")
    |> difference(nonNegative: true)

// 7. Statistical Analysis

// Calculate percentiles
from(bucket: "example-bucket")
    |> range(start: -24h)
    |> filter(fn: (r) => r._measurement == "application_metrics")
    |> filter(fn: (r) => r._field == "response_time")
    |> aggregateWindow(
        every: 1h,
        fn: (tables=<-, column="_value") => tables
            |> percentile(column: column, percentile: 0.50)
            |> set(key: "_field", value: "response_time_p50")
            |> yield(name: "p50"),
        createEmpty: false
    )
    |> aggregateWindow(
        every: 1h,
        fn: (tables=<-, column="_value") => tables
            |> percentile(column: column, percentile: 0.95)
            |> set(key: "_field", value: "response_time_p95")
            |> yield(name: "p95"),
        createEmpty: false
    )
    |> aggregateWindow(
        every: 1h,
        fn: (tables=<-, column="_value") => tables
            |> percentile(column: column, percentile: 0.99)
            |> set(key: "_field", value: "response_time_p99")
            |> yield(name: "p99"),
        createEmpty: false
    )

// Calculate standard deviation
from(bucket: "example-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "sensor_data")
    |> filter(fn: (r) => r._field == "temperature")
    |> stddev()

// 8. Time-based Analysis

// Analyze patterns by hour of day
from(bucket: "example-bucket")
    |> range(start: -7d)
    |> filter(fn: (r) => r._measurement == "application_metrics")
    |> filter(fn: (r) => r._field == "active_users")
    |> group(columns: ["_measurement"])
    |> aggregateWindow(
        every: 1h,
        fn: mean,
        createEmpty: false
    )
    |> hour()
    |> group(columns: ["_time"])
    |> mean()

// Day of week analysis
from(bucket: "example-bucket")
    |> range(start: -30d)
    |> filter(fn: (r) => r._measurement == "application_metrics")
    |> filter(fn: (r) => r._field == "throughput")
    |> group(columns: ["_measurement"])
    |> aggregateWindow(
        every: 1d,
        fn: mean,
        createEmpty: false
    )
    |> dayOfWeek()
    |> group(columns: ["_time"])
    |> mean()

// 9. Custom Functions

// Custom function for anomaly detection
anomalies = (table=<-, threshold=2.0) =>
    table
        |> mean(column: "_value")
        |> map(fn: (r) => ({r with _value: r._value}))
        |> stddev()
        |> map(fn: (r) => ({r with upper_bound: r._value + threshold * r._value}))
        |> map(fn: (r) => ({r with lower_bound: r._value - threshold * r._value}))

// Use custom function
from(bucket: "example-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "sensor_data")
    |> filter(fn: (r) => r._field == "temperature")
    |> anomalies(threshold: 2.0)

// 10. Performance Monitoring Queries

// Calculate SLA compliance
from(bucket: "example-bucket")
    |> range(start: -24h)
    |> filter(fn: (r) => r._measurement == "application_metrics")
    |> filter(fn: (r) => r._field == "response_time")
    |> map(fn: (r) => ({
        r with
        _value: if r._value <= 500 then 1.0 else 0.0,
        _field: "sla_compliant"
    }))
    |> aggregateWindow(
        every: 1h,
        fn: mean,
        createEmpty: false
    )
    |> map(fn: (r) => ({
        r with
        _value: r._value * 100.0,
        _field: "sla_compliance_percent"
    }))

// Error rate calculation
total_requests = from(bucket: "example-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "application_metrics")
    |> filter(fn: (r) => r._field == "request_count")
    |> sum()

error_requests = from(bucket: "example-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "application_metrics")
    |> filter(fn: (r) => r._field == "error_count")
    |> sum()

join(
    tables: {t1: total_requests, t2: error_requests},
    on: ["_time"],
    method: "inner"
)
    |> map(fn: (r) => ({
        r with
        _value: r._value_t2 / r._value_t1 * 100.0,
        _field: "error_rate_percent"
    }))

// 11. Capacity Planning

// Growth rate calculation
from(bucket: "example-bucket")
    |> range(start: -30d)
    |> filter(fn: (r) => r._measurement == "storage_metrics")
    |> filter(fn: (r) => r._field == "disk_usage")
    |> aggregateWindow(
        every: 1d,
        fn: mean,
        createEmpty: false
    )
    |> derivative(unit: 1d)

// Predict future usage
from(bucket: "example-bucket")
    |> range(start: -7d)
    |> filter(fn: (r) => r._measurement == "application_metrics")
    |> filter(fn: (r) => r._field == "active_users")
    |> linearInterpolate(every: 1h)
    |> extrapolate(
        every: 1h,
        duration: 24h
    )

// 12. Complex Data Analysis

// Correlation analysis between metrics
cpu_data = from(bucket: "example-bucket")
    |> range(start: -24h)
    |> filter(fn: (r) => r._measurement == "system_metrics")
    |> filter(fn: (r) => r._field == "cpu_usage")
    |> aggregateWindow(every: 5m, fn: mean, createEmpty: false)

memory_data = from(bucket: "example-bucket")
    |> range(start: -24h)
    |> filter(fn: (r) => r._measurement == "system_metrics")
    |> filter(fn: (r) => r._field == "memory_usage")
    |> aggregateWindow(every: 5m, fn: mean, createEmpty: false)

join(
    tables: {cpu: cpu_data, memory: memory_data},
    on: ["_time"],
    method: "inner"
)
    |> map(fn: (r) => ({
        r with
        cpu_memory_ratio: r._value_cpu / r._value_memory
    }))

// Heatmap data preparation
from(bucket: "example-bucket")
    |> range(start: -7d)
    |> filter(fn: (r) => r._measurement == "sensor_data")
    |> filter(fn: (r) => r._field == "temperature")
    |> hour()
    |> dayOfWeek()
    |> group(columns: ["_time", "location"])
    |> mean()
    |> sort(columns: ["_time"])