🎯 Exemples recommandés
Balanced sample collections from various categories for you to explore
Exemples InfluxDB
Exemples de base de données de séries temporelles InfluxDB incluant la modélisation de données, les requêtes et le monitoring
💻 Opérations de Base InfluxDB python
🟢 simple
⭐⭐
Opérations fondamentales avec InfluxDB incluant l'écriture de données, la lecture et les requêtes de base
⏱️ 20 min
🏷️ influxdb, time series, iot, monitoring, python
Prerequisites:
Python programming, Basic understanding of time series data
# InfluxDB Basic Operations
# Python - Using influxdb-client library
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS
import datetime
import time
import random
# Configuration
url = "http://localhost:8086"
token = "your-token-here"
org = "your-org"
bucket = "example-bucket"
# Initialize client
class InfluxDBManager:
def __init__(self, url, token, org):
self.url = url
self.token = token
self.org = org
self.client = InfluxDBClient(url=url, token=token, org=org)
self.write_api = self.client.write_api(write_options=SYNCHRONOUS)
self.query_api = self.client.query_api()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.client.close()
# 1. Writing Data Points
def write_single_point(self, measurement, tags, fields, timestamp=None):
"""Write a single data point"""
point = Point(measurement)
# Add tags
for key, value in tags.items():
point.tag(key, value)
# Add fields
for key, value in fields.items():
point.field(key, value)
# Set timestamp
if timestamp:
point.time(timestamp, WritePrecision.NS)
try:
self.write_api.write(bucket=bucket, record=point)
print(f"✅ Written point to {measurement}")
except Exception as e:
print(f"❌ Error writing point: {e}")
def write_multiple_points(self, points):
"""Write multiple data points at once"""
try:
self.write_api.write(bucket=bucket, record=points)
print(f"✅ Written {len(points)} points")
except Exception as e:
print(f"❌ Error writing points: {e}")
# 2. Data Modeling Examples
def write_sensor_data(self):
"""Write IoT sensor data"""
# Temperature sensor readings
for i in range(10):
temperature = 20 + random.uniform(-5, 15)
humidity = 60 + random.uniform(-20, 30)
timestamp = datetime.datetime.now() - datetime.timedelta(minutes=i*5)
point = Point("environmental") .tag("location", "office") .tag("sensor_id", "temp_001") .tag("room", "conference") .field("temperature", temperature) .field("humidity", humidity) .time(timestamp, WritePrecision.NS)
self.write_api.write(bucket=bucket, record=point)
# CPU and memory metrics
for i in range(20):
cpu_usage = random.uniform(20, 90)
memory_usage = random.uniform(30, 85)
timestamp = datetime.datetime.now() - datetime.timedelta(minutes=i)
point = Point("system_metrics") .tag("host", "web-server-01") .tag("region", "us-west-1") .tag("environment", "production") .field("cpu_usage", cpu_usage) .field("memory_usage", memory_usage) .field("disk_usage", random.uniform(40, 70)) .time(timestamp, WritePrecision.NS)
self.write_api.write(bucket=bucket, record=point)
def write_application_metrics(self):
"""Write application performance metrics"""
for i in range(15):
response_time = random.uniform(100, 2000)
error_count = random.choices([0, 1, 2, 3], weights=[70, 20, 8, 2])[0]
active_users = random.randint(100, 500)
timestamp = datetime.datetime.now() - datetime.timedelta(minutes=i*2)
point = Point("application_metrics") .tag("service", "user-service") .tag("endpoint", "/api/users") .tag("version", "v1.2.3") .field("response_time", response_time) .field("error_count", error_count) .field("active_users", active_users) .field("throughput", random.uniform(50, 200)) .time(timestamp, WritePrecision.NS)
self.write_api.write(bucket=bucket, record=point)
# 3. Querying Data
def query_recent_data(self, measurement, duration="1h"):
"""Query recent data from a measurement"""
query = f'''
from(bucket: "{bucket}")
|> range(start: -{duration})
|> filter(fn: (r) => r._measurement == "{measurement}")
|> limit(n: 10)
'''
try:
result = self.query_api.query(query, org=self.org)
records = []
for table in result:
for record in table.records:
records.append({
'time': record.get_time(),
'field': record.get_field(),
'value': record.get_value(),
'measurement': record.get_measurement(),
'tags': record.values
})
print(f"📊 Found {len(records)} records from {measurement}")
return records
except Exception as e:
print(f"❌ Error querying data: {e}")
return []
def query_by_tags(self, measurement, tag_filters, duration="24h"):
"""Query data with specific tag filters"""
tag_conditions = []
for key, value in tag_filters.items():
tag_conditions.append(f'r.{key} == "{value}"')
tag_filter = ' and '.join(tag_conditions)
query = f'''
from(bucket: "{bucket}")
|> range(start: -{duration})
|> filter(fn: (r) => r._measurement == "{measurement}")
|> filter(fn: (r) => {tag_filter})
|> sort(columns: ["_time"])
'''
try:
result = self.query_api.query(query, org=self.org)
return self._process_query_result(result)
except Exception as e:
print(f"❌ Error querying by tags: {e}")
return []
def query_aggregated_data(self, measurement, field, aggregation="mean", window="5m", duration="1h"):
"""Query aggregated data"""
query = f'''
from(bucket: "{bucket}")
|> range(start: -{duration})
|> filter(fn: (r) => r._measurement == "{measurement}")
|> filter(fn: (r) => r._field == "{field}")
|> aggregateWindow(every: {window}, fn: {aggregation}, createEmpty: false)
|> yield(name: "{aggregation}")
'''
try:
result = self.query_api.query(query, org=self.org)
return self._process_query_result(result)
except Exception as e:
print(f"❌ Error querying aggregated data: {e}")
return []
def _process_query_result(self, result):
"""Process query result into list of dictionaries"""
records = []
for table in result:
for record in table.records:
records.append({
'time': record.get_time(),
'field': record.get_field(),
'value': record.get_value(),
'measurement': record.get_measurement(),
'tags': {k: v for k, v in record.values.items() if not k.startswith('_')}
})
return records
# 4. Advanced Queries
def query_downsampled_data(self, measurement, duration="24h", window="1h"):
"""Query downsampled data with multiple aggregations"""
query = f'''
from(bucket: "{bucket}")
|> range(start: -{duration})
|> filter(fn: (r) => r._measurement == "{measurement}")
|> aggregateWindow(
every: {window},
fn: (tables=<-, column="_value") =>
tables
|> mean(column: column)
|> set(key: "_field", value: "mean")
|> yield(name: "mean"),
createEmpty: false
)
'''
try:
result = self.query_api.query(query, org=self.org)
return self._process_query_result(result)
except Exception as e:
print(f"❌ Error querying downsampled data: {e}")
return []
def query_with_transformations(self):
"""Query with data transformations"""
query = '''
from(bucket: "example-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "system_metrics")
|> filter(fn: (r) => r._field == "cpu_usage")
|> map(fn: (r) => ({ r with _value: r._value / 100.0 }))
|> filter(fn: (r) => r._value > 0.8)
|> sort(columns: ["_value"], desc: true)
|> limit(n: 5)
'''
try:
result = self.query_api.query(query, org=self.org)
return self._process_query_result(result)
except Exception as e:
print(f"❌ Error with transformed query: {e}")
return []
# 5. Batch Operations
def batch_write_sensor_data(self, num_points=100):
"""Write multiple sensor data points in batch"""
points = []
locations = ["office", "warehouse", "factory", "datacenter"]
for i in range(num_points):
location = random.choice(locations)
temperature = 20 + random.uniform(-10, 20)
humidity = 50 + random.uniform(-30, 40)
pressure = 1000 + random.uniform(-50, 50)
point = Point("environmental_sensors") .tag("location", location) .tag("sensor_type", "weather_station") .field("temperature", temperature) .field("humidity", humidity) .field("pressure", pressure) .time(datetime.datetime.now() - datetime.timedelta(seconds=i*30), WritePrecision.NS)
points.append(point)
self.write_multiple_points(points)
return points
# Usage Examples
def demonstrate_basic_operations():
"""Demonstrate basic InfluxDB operations"""
with InfluxDBManager(url, token, org) as db:
print("🚀 Starting InfluxDB Basic Operations Demo")
# 1. Write single data points
print("
📝 Writing single data points...")
db.write_single_point(
measurement="test_data",
tags={"source": "demo", "version": "1.0"},
fields={"value": 42.5, "status": "ok"}
)
# 2. Write sensor data
print("
🌡️ Writing sensor data...")
db.write_sensor_data()
# 3. Write application metrics
print("
📊 Writing application metrics...")
db.write_application_metrics()
# 4. Query recent data
print("
🔍 Querying recent data...")
recent_data = db.query_recent_data("environmental", "30m")
for record in recent_data[:5]:
print(f" {record['time']}: {record['field']} = {record['value']}")
# 5. Query with tag filters
print("
🏷️ Querying with tag filters...")
filtered_data = db.query_by_tags(
measurement="system_metrics",
tag_filters={"host": "web-server-01", "environment": "production"},
duration="1h"
)
print(f"Found {len(filtered_data)} records with specified tags")
# 6. Query aggregated data
print("
📈 Querying aggregated data...")
avg_cpu = db.query_aggregated_data(
measurement="system_metrics",
field="cpu_usage",
aggregation="mean",
window="10m",
duration="1h"
)
print(f"Average CPU usage points: {len(avg_cpu)}")
# 7. Batch write
print("
💾 Batch writing sensor data...")
batch_points = db.batch_write_sensor_data(50)
print(f"Batch written {len(batch_points)} points")
# 6. Data Exploration
class InfluxDBExplorer:
def __init__(self, url, token, org):
self.client = InfluxDBClient(url=url, token=token, org=org)
self.query_api = self.client.query_api()
def list_measurements(self, bucket="example-bucket"):
"""List all measurements in a bucket"""
query = f'''
import "influxdata/influxdb/schema"
schema.measurements(bucket: "{bucket}")
'''
try:
result = self.query_api.query(query, org=self.org)
measurements = [record.get_value() for table in result for record in table.records]
print(f"📋 Measurements in bucket '{bucket}': {measurements}")
return measurements
except Exception as e:
print(f"❌ Error listing measurements: {e}")
return []
def show_tag_keys(self, measurement, bucket="example-bucket"):
"""Show all tag keys for a measurement"""
query = f'''
import "influxdata/influxdb/schema"
schema.tagKeys(bucket: "{bucket}", predicate: (r) => r._measurement == "{measurement}")
'''
try:
result = self.query_api.query(query, org=self.org)
tag_keys = [record.get_value() for table in result for record in table.records]
print(f"🏷️ Tag keys for '{measurement}': {tag_keys}")
return tag_keys
except Exception as e:
print(f"❌ Error showing tag keys: {e}")
return []
def show_field_keys(self, measurement, bucket="example-bucket"):
"""Show all field keys for a measurement"""
query = f'''
import "influxdata/influxdb/schema"
schema.fieldKeys(bucket: "{bucket}", predicate: (r) => r._measurement == "{measurement}")
'''
try:
result = self.query_api.query(query, org=self.org)
field_keys = [record.get_value() for table in result for record in table.records]
print(f"🔢 Field keys for '{measurement}': {field_keys}")
return field_keys
except Exception as e:
print(f"❌ Error showing field keys: {e}")
return []
def get_bucket_stats(self, bucket="example-bucket"):
"""Get basic statistics for a bucket"""
query = f'''
from(bucket: "{bucket}")
|> range(start: -24h)
|> group(columns: ["_measurement"])
|> count()
'''
try:
result = self.query_api.query(query, org=self.org)
stats = {}
for table in result:
measurement = table.records[0].values.get("_measurement")
count = table.records[0].get_value()
stats[measurement] = count
print(f"📊 Bucket statistics for '{bucket}':")
for measurement, count in stats.items():
print(f" {measurement}: {count} points")
return stats
except Exception as e:
print(f"❌ Error getting bucket stats: {e}")
return {}
def demonstrate_exploration():
"""Demonstrate data exploration capabilities"""
with InfluxDBExplorer(url, token, org) as explorer:
print("
🔍 Exploring InfluxDB Data Structure")
measurements = explorer.list_measurements()
if measurements:
for measurement in measurements[:3]: # Show first 3
explorer.show_tag_keys(measurement)
explorer.show_field_keys(measurement)
explorer.get_bucket_stats()
if __name__ == "__main__":
# Run the demonstrations
demonstrate_basic_operations()
demonstrate_exploration()
💻 Monitoring et Alertes InfluxDB yaml
🟢 simple
⭐⭐⭐⭐
Configuration des tableaux de bord, règles d'alerte et systèmes de notification
⏱️ 35 min
🏷️ influxdb, monitoring, alerting, devops, telegraf
Prerequisites:
Advanced InfluxDB operations, System administration, Monitoring concepts
# InfluxDB Monitoring and Alerting
# Configuration files and examples for setting up monitoring systems
# 1. Telegraf Configuration for System Monitoring
# /etc/telegraf/telegraf.conf
[agent]
interval = "10s"
round_interval = true
metric_batch_size = 1000
metric_buffer_limit = 10000
collection_jitter = "0s"
flush_interval = "10s"
flush_jitter = "0s"
# Output to InfluxDB
[[outputs.influxdb_v2]]
urls = ["http://localhost:8086"]
token = "your-auth-token"
organization = "your-org"
bucket = "monitoring"
# System input plugins
[[inputs.cpu]]
percpu = true
totalcpu = true
collect_cpu_time = false
report_active = false
core_tags = true
[[inputs.mem]]
fieldpass = ["available", "available_percent", "buffered", "cached", "free", "total", "used", "used_percent"]
[[inputs.disk]]
mount_points = ["/"]
ignore_fs = ["tmpfs", "devtmpfs", "devfs", "overlay", "aufs", "squashfs"]
[[inputs.diskio]]
[[inputs.kernel]]
# No configuration needed, collects kernel metrics
[[inputs.processes]]
collect_parent_pid = true
[[inputs.swap]]
fieldpass = ["used", "used_percent", "free", "total"]
[[inputs.system]]
fieldpass = ["uptime_format", "load1", "load5", "load15"]
# Network monitoring
[[inputs.net]]
fieldpass = ["bytes_sent", "bytes_recv", "packets_sent", "packets_recv", "err_in", "err_out", "drop_in", "drop_out"]
[[inputs.netstat]]
fieldpass = ["tcp_listen", "tcp_established", "tcp_close_wait", "tcp_time_wait", "udp_listen"]
# Application monitoring
[[inputs.prometheus]]
urls = ["http://localhost:9090/metrics"]
metric_version = 2
[[inputs.http_response]]
urls = ["http://localhost:8080/health", "http://localhost:3000/api/health"]
timeout = "5s"
method = "GET"
follow_redirects = true
response_timeout = "5s"
response_string_match = "ok"
# Docker monitoring
[[inputs.docker]]
endpoint = "unix:///var/run/docker.sock"
container_names = []
timeout = "5s"
perdevice = true
total = true
docker_label_include = []
# 2. InfluxDB 2.x Alerting Configuration
# alert-rules.yaml
apiVersion: 1
groups:
- name: system_alerts
rules:
- alert: HighCPUUsage
type: query
query: |
from(bucket: "monitoring")
|> range(start: -5m)
|> filter(fn: (r) => r._measurement == "cpu")
|> filter(fn: (r) => r._field == "usage_active")
|> aggregateWindow(every: 1m, fn: mean)
|> filter(fn: (r) => r._value > 80.0)
status: OK
statusRules:
- current: [OK, Warning, Critical]
color: "green"
- current: [OK, Warning]
color: "yellow"
- current: [OK, Critical]
color: "red"
message: "High CPU usage detected: {{ index .Tags "host" }} has {{ .Values.C }}% CPU usage"
labels:
severity: warning
service: system
- alert: HighMemoryUsage
type: query
query: |
from(bucket: "monitoring")
|> range(start: -5m)
|> filter(fn: (r) => r._measurement == "mem")
|> filter(fn: (r) => r._field == "used_percent")
|> aggregateWindow(every: 1m, fn: mean)
|> filter(fn: (r) => r._value > 85.0)
status: OK
statusRules:
- current: [OK, Warning, Critical]
color: "green"
- current: [OK, Warning]
color: "yellow"
- current: [OK, Critical]
color: "red"
message: "High memory usage on {{ index .Tags "host" }}: {{ .Values.C }}% used"
labels:
severity: warning
service: system
- alert: DiskSpaceLow
type: query
query: |
from(bucket: "monitoring")
|> range(start: -5m)
|> filter(fn: (r) => r._measurement == "disk")
|> filter(fn: (r) => r._field == "used_percent")
|> aggregateWindow(every: 1m, fn: mean)
|> filter(fn: (r) => r._value > 90.0)
status: OK
statusRules:
- current: [OK, Critical]
color: "red"
message: "Disk space critically low on {{ index .Tags "host" }}/{{ index .Tags "path" }}: {{ .Values.C }}% used"
labels:
severity: critical
service: system
- name: application_alerts
rules:
- alert: HighErrorRate
type: query
query: |
from(bucket: "monitoring")
|> range(start: -5m)
|> filter(fn: (r) => r._measurement == "application_metrics")
|> filter(fn: (r) => r._field == "error_rate")
|> filter(fn: (r) => r._value > 5.0)
status: OK
statusRules:
- current: [OK, Warning, Critical]
color: "green"
- current: [OK, Warning]
color: "yellow"
- current: [OK, Critical]
color: "red"
message: "High error rate in {{ index .Tags "service" }}: {{ .Values.C }}% errors"
labels:
severity: warning
service: application
- alert: HighResponseTime
type: query
query: |
from(bucket: "monitoring")
|> range(start: -5m)
|> filter(fn: (r) => r._measurement == "application_metrics")
|> filter(fn: (r) => r._field == "response_time_p95")
|> filter(fn: (r) => r._value > 1000.0)
status: OK
statusRules:
- current: [OK, Warning, Critical]
color: "green"
- current: [OK, Warning]
color: "yellow"
- current: [OK, Critical]
color: "red"
message: "High response time in {{ index .Tags "service" }}: {{ .Values.C }}ms P95"
labels:
severity: warning
service: application
- alert: ServiceDown
type: threshold
thresholds:
- value: 0
level: critical
message: "Service {{ index .Tags "url" }} is down"
labels:
severity: critical
service: availability
- name: custom_threshold_alerts
rules:
- alert: TemperatureHigh
type: query
query: |
from(bucket: "monitoring")
|> range(start: -2m)
|> filter(fn: (r) => r._measurement == "sensor_data")
|> filter(fn: (r) => r._field == "temperature")
|> filter(fn: (r) => r._value > 35.0)
status: OK
statusRules:
- current: [OK, Warning, Critical]
color: "green"
- current: [OK, Warning]
color: "yellow"
- current: [OK, Critical]
color: "red"
message: "High temperature alert: {{ index .Tags "location" }} sensor reports {{ .Values.C }}°C"
labels:
severity: warning
service: environmental
# 3. Notification Endpoints Configuration
# /etc/influxdb2/notifiers.yaml
apiVersion: 1
notifiers:
- name: SlackNotification
type: slack
url: https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK
username: InfluxDBAlerts
channel: "#alerts"
color: critical
template_path: /etc/influxdb2/templates/slack.tmpl
- name: EmailNotification
type: smtp
username: [email protected]
password: your-email-password
host: smtp.gmail.com
port: 587
from: [email protected]
to: "[email protected],[email protected]"
subject: "InfluxDB Alert: {{ .Title }}"
template_path: /etc/influxdb2/templates/email.tmpl
- name: PagerDutyNotification
type: pagerduty
serviceKey: your-pagerduty-service-key
severity: critical
details:
firing: "{{ .State }}"
threshold: "{{ .Thresholds }}"
- name: WebhookNotification
type: http
url: http://your-webhook-endpoint/alerts
method: POST
headers:
Authorization: "Bearer your-token"
Content-Type: "application/json"
body: |
{
"alert": "{{ .Title }}",
"message": "{{ .Message }}",
"severity": "{{ index .Labels "severity" }}",
"timestamp": "{{ .Time }}",
"tags": {{ .Tags | toJSON }}
}
# 4. Notification Templates
# /etc/influxdb2/templates/slack.tmpl
{{ range .Alerts }}
{{ if eq .Status "firing" }}
🚨 **ALERT: {{ .Title }}**
• Message: {{ .Message }}
• Severity: {{ index .Labels "severity" }}
• Time: {{ .Time }}
• Tags: {{ range $key, $value := .Tags }}{{ $key }}={{ $value }} {{ end }}
{{ end }}
{{ if eq .Status "resolved" }}
✅ **RESOLVED: {{ .Title }}**
• Message: {{ .Message }}
• Resolved at: {{ .Time }}
{{ end }}
{{ end }}
# /etc/influxdb2/templates/email.tmpl
Subject: InfluxDB Alert: {{ .Title }}
{{ range .Alerts }}
{{ if eq .Status "firing" }}
ALERT FIRED
Title: {{ .Title }}
Message: {{ .Message }}
Severity: {{ index .Labels "severity" }}
Service: {{ index .Labels "service" }}
Time: {{ .Time }}
Tags:
{{ range $key, $value := .Tags }}
- {{ $key }}: {{ $value }}
{{ end }}
{{ end }}
{{ if eq .Status "resolved" }}
ALERT RESOLVED
Title: {{ .Title }}
Message: {{ .Message }}
Resolved at: {{ .Time }}
Duration: {{ .Duration }}
{{ end }}
{{ end }}
# 5. Grafana Dashboard Configuration
# grafana-dashboard.json (excerpt)
{
"dashboard": {
"title": "System Monitoring Dashboard",
"tags": ["system", "influxdb", "telegraf"],
"timezone": "browser",
"panels": [
{
"title": "CPU Usage",
"type": "stat",
"targets": [
{
"expr": "from(bucket: \"monitoring\") |> range(start: v.timeRangeStart, stop: v.timeRangeStop) |> filter(fn: (r) => r._measurement == \"cpu\") |> filter(fn: (r) => r._field == \"usage_active\") |> last()"
}
],
"fieldConfig": {
"defaults": {
"unit": "percent",
"thresholds": {
"steps": [
{"color": "green", "value": null},
{"color": "yellow", "value": 70},
{"color": "red", "value": 90}
]
}
}
}
},
{
"title": "Memory Usage",
"type": "graph",
"targets": [
{
"expr": "from(bucket: \"monitoring\") |> range(start: v.timeRangeStart, stop: v.timeRangeStop) |> filter(fn: (r) => r._measurement == \"mem\") |> filter(fn: (r) => r._field == \"used_percent\")"
}
],
"yAxes": [
{
"max": 100,
"min": 0,
"unit": "percent"
}
]
},
{
"title": "Disk Usage by Mount Point",
"type": "piechart",
"targets": [
{
"expr": "from(bucket: \"monitoring\") |> range(start: v.timeRangeStart, stop: v.timeRangeStop) |> filter(fn: (r) => r._measurement == \"disk\") |> filter(fn: (r) => r._field == \"used_percent\") |> last()"
}
]
}
],
"time": {
"from": "now-1h",
"to": "now"
},
"refresh": "30s"
}
}
# 6. Python Alert Manager Integration
# alert_manager.py
import requests
import json
import time
from datetime import datetime
from typing import Dict, List, Optional
class InfluxDBAlertManager:
def __init__(self, influx_url: str, token: str, org: str):
self.base_url = influx_url
self.token = token
self.org = org
self.headers = {
"Authorization": f"Token {token}",
"Content-Type": "application/json"
}
def create_check(self, check_name: str, query: str, every: str, offset: str = "0s"):
"""Create an alert check in InfluxDB"""
check_data = {
"name": check_name,
"query": query,
"every": every,
"offset": offset,
"status": "active",
"labels": {
"source": "custom_alerts"
}
}
url = f"{self.base_url}/api/v2/checks"
response = requests.post(url, headers=self.headers, json=check_data)
if response.status_code == 201:
print(f"✅ Alert check '{check_name}' created successfully")
return response.json()
else:
print(f"❌ Failed to create alert check: {response.text}")
return None
def get_alert_endpoints(self):
"""Get all notification endpoints"""
url = f"{self.base_url}/api/v2/notificationEndpoints"
response = requests.get(url, headers=self.headers)
if response.status_code == 200:
return response.json()["notificationEndpoints"]
else:
print(f"❌ Failed to get alert endpoints: {response.text}")
return []
def create_slack_endpoint(self, name: str, webhook_url: str, channel: str):
"""Create a Slack notification endpoint"""
endpoint_data = {
"name": name,
"type": "slack",
"url": webhook_url,
"username": "InfluxDB Alerts",
"channel": channel
}
url = f"{self.base_url}/api/v2/notificationEndpoints"
response = requests.post(url, headers=self.headers, json=endpoint_data)
if response.status_code == 201:
print(f"✅ Slack endpoint '{name}' created successfully")
return response.json()
else:
print(f"❌ Failed to create Slack endpoint: {response.text}")
return None
def create_notification_rule(self, rule_name: str, check_id: str, endpoint_id: str):
"""Create a notification rule"""
rule_data = {
"name": rule_name,
"every": "1m",
"offset": "0s",
"status": "active",
"check": {
"type": "threshold",
"checkID": check_id
},
"endpointID": endpoint_id,
"statusRules": [
{
"currentLevel": "OK",
"previousLevel": "CRITICAL",
"notify": True
}
]
}
url = f"{self.base_url}/api/v2/notificationRules"
response = requests.post(url, headers=self.headers, json=rule_data)
if response.status_code == 201:
print(f"✅ Notification rule '{rule_name}' created successfully")
return response.json()
else:
print(f"❌ Failed to create notification rule: {response.text}")
return None
# Usage example
def setup_monitoring():
"""Setup complete monitoring system"""
alert_manager = InfluxDBAlertManager(
"http://localhost:8086",
"your-token",
"your-org"
)
# Create custom checks
high_cpu_query = '''
from(bucket: "monitoring")
|> range(start: -5m)
|> filter(fn: (r) => r._measurement == "cpu")
|> filter(fn: (r) => r._field == "usage_active")
|> filter(fn: (r) => r._value > 80.0)
'''
alert_manager.create_check(
"High CPU Usage",
high_cpu_query,
"1m",
"30s"
)
# Create notification endpoints
slack_endpoint = alert_manager.create_slack_endpoint(
"DevOps Alerts",
"https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK",
"#monitoring"
)
if slack_endpoint:
# Create notification rule
alert_manager.create_notification_rule(
"CPU Alert Rule",
"check-id-here", # Replace with actual check ID
slack_endpoint["id"]
)
if __name__ == "__main__":
setup_monitoring()
💻 Requêtes Flux InfluxDB flux
🟡 intermediate
⭐⭐⭐⭐
Exemples avancés du langage de requête Flux pour l'analyse complexe de données et les transformations
⏱️ 30 min
🏷️ influxdb, flux, queries, analytics, time series
Prerequisites:
Basic InfluxDB operations, Understanding of data analysis concepts
// InfluxDB Flux Query Examples
// Advanced data analysis and transformations
// 1. Basic Query Patterns
// Query recent data from specific measurement
from(bucket: "example-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "sensor_data")
|> filter(fn: (r) => r._field == "temperature")
|> sort(columns: ["_time"])
|> limit(n: 100)
// Query with multiple field filters
from(bucket: "example-bucket")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "system_metrics")
|> filter(fn: (r) => r._field == "cpu_usage" or r._field == "memory_usage")
|> filter(fn: (r) => r.host == "web-server-01")
// 2. Aggregation and Windowing
// Time-based aggregation with mean, min, max
from(bucket: "example-bucket")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "sensor_data")
|> filter(fn: (r) => r._field == "temperature")
|> aggregateWindow(
every: 1h,
fn: mean,
createEmpty: false
)
|> yield(name: "hourly_avg")
// Multiple aggregations
from(bucket: "example-bucket")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "sensor_data")
|> filter(fn: (r) => r._field == "temperature")
|> aggregateWindow(
every: 1h,
fn: (tables=<-, column="_value") => tables
|> mean(column: column)
|> map(fn: (r) => ({r with _field: "temperature_mean"}))
|> yield(name: "mean"),
createEmpty: false
)
|> aggregateWindow(
every: 1h,
fn: (tables=<-, column="_value") => tables
|> max(column: column)
|> map(fn: (r) => ({r with _field: "temperature_max"}))
|> yield(name: "max"),
createEmpty: false
)
|> aggregateWindow(
every: 1h,
fn: (tables=<-, column="_value") => tables
|> min(column: column)
|> map(fn: (r) => ({r with _field: "temperature_min"}))
|> yield(name: "min"),
createEmpty: false
)
// 3. Data Transformation
// Unit conversion - Celsius to Fahrenheit
from(bucket: "example-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "sensor_data")
|> filter(fn: (r) => r._field == "temperature")
|> map(fn: (r) => ({
r with
_value: r._value * 9.0 / 5.0 + 32.0,
_field: "temperature_fahrenheit"
}))
// Calculate percentage from decimal
from(bucket: "example-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "system_metrics")
|> filter(fn: (r) => r._field == "cpu_usage")
|> map(fn: (r) => ({
r with
_value: r._value * 100.0,
_field: "cpu_usage_percent"
}))
// 4. Filtering and Thresholding
// Filter by value thresholds
from(bucket: "example-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "sensor_data")
|> filter(fn: (r) => r._field == "temperature")
|> filter(fn: (r) => r._value > 30.0 or r._value < 10.0)
// Filter by time windows (business hours only)
from(bucket: "example-bucket")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "application_metrics")
|> hourSelection(start: 9, stop: 17)
// 5. Joins and Relations
// Join temperature and humidity data
temperature = from(bucket: "example-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "sensor_data")
|> filter(fn: (r) => r._field == "temperature")
|> rename(columns: {_value: "temperature"})
humidity = from(bucket: "example-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "sensor_data")
|> filter(fn: (r) => r._field == "humidity")
|> rename(columns: {_value: "humidity"})
join(
tables: {t1: temperature, t2: humidity},
on: ["_time", "location", "sensor_id"],
method: "inner"
)
// 6. Mathematical Operations
// Calculate moving average
from(bucket: "example-bucket")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "sensor_data")
|> filter(fn: (r) => r._field == "temperature")
|> movingAverage(n: 10)
// Calculate rate of change (derivative)
from(bucket: "example-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "system_metrics")
|> filter(fn: (r) => r._field == "memory_usage")
|> derivative(unit: 1m, nonNegative: true)
// Calculate difference between consecutive points
from(bucket: "example-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "counters")
|> filter(fn: (r) => r._field == "request_count")
|> difference(nonNegative: true)
// 7. Statistical Analysis
// Calculate percentiles
from(bucket: "example-bucket")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "application_metrics")
|> filter(fn: (r) => r._field == "response_time")
|> aggregateWindow(
every: 1h,
fn: (tables=<-, column="_value") => tables
|> percentile(column: column, percentile: 0.50)
|> set(key: "_field", value: "response_time_p50")
|> yield(name: "p50"),
createEmpty: false
)
|> aggregateWindow(
every: 1h,
fn: (tables=<-, column="_value") => tables
|> percentile(column: column, percentile: 0.95)
|> set(key: "_field", value: "response_time_p95")
|> yield(name: "p95"),
createEmpty: false
)
|> aggregateWindow(
every: 1h,
fn: (tables=<-, column="_value") => tables
|> percentile(column: column, percentile: 0.99)
|> set(key: "_field", value: "response_time_p99")
|> yield(name: "p99"),
createEmpty: false
)
// Calculate standard deviation
from(bucket: "example-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "sensor_data")
|> filter(fn: (r) => r._field == "temperature")
|> stddev()
// 8. Time-based Analysis
// Analyze patterns by hour of day
from(bucket: "example-bucket")
|> range(start: -7d)
|> filter(fn: (r) => r._measurement == "application_metrics")
|> filter(fn: (r) => r._field == "active_users")
|> group(columns: ["_measurement"])
|> aggregateWindow(
every: 1h,
fn: mean,
createEmpty: false
)
|> hour()
|> group(columns: ["_time"])
|> mean()
// Day of week analysis
from(bucket: "example-bucket")
|> range(start: -30d)
|> filter(fn: (r) => r._measurement == "application_metrics")
|> filter(fn: (r) => r._field == "throughput")
|> group(columns: ["_measurement"])
|> aggregateWindow(
every: 1d,
fn: mean,
createEmpty: false
)
|> dayOfWeek()
|> group(columns: ["_time"])
|> mean()
// 9. Custom Functions
// Custom function for anomaly detection
anomalies = (table=<-, threshold=2.0) =>
table
|> mean(column: "_value")
|> map(fn: (r) => ({r with _value: r._value}))
|> stddev()
|> map(fn: (r) => ({r with upper_bound: r._value + threshold * r._value}))
|> map(fn: (r) => ({r with lower_bound: r._value - threshold * r._value}))
// Use custom function
from(bucket: "example-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "sensor_data")
|> filter(fn: (r) => r._field == "temperature")
|> anomalies(threshold: 2.0)
// 10. Performance Monitoring Queries
// Calculate SLA compliance
from(bucket: "example-bucket")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "application_metrics")
|> filter(fn: (r) => r._field == "response_time")
|> map(fn: (r) => ({
r with
_value: if r._value <= 500 then 1.0 else 0.0,
_field: "sla_compliant"
}))
|> aggregateWindow(
every: 1h,
fn: mean,
createEmpty: false
)
|> map(fn: (r) => ({
r with
_value: r._value * 100.0,
_field: "sla_compliance_percent"
}))
// Error rate calculation
total_requests = from(bucket: "example-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "application_metrics")
|> filter(fn: (r) => r._field == "request_count")
|> sum()
error_requests = from(bucket: "example-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "application_metrics")
|> filter(fn: (r) => r._field == "error_count")
|> sum()
join(
tables: {t1: total_requests, t2: error_requests},
on: ["_time"],
method: "inner"
)
|> map(fn: (r) => ({
r with
_value: r._value_t2 / r._value_t1 * 100.0,
_field: "error_rate_percent"
}))
// 11. Capacity Planning
// Growth rate calculation
from(bucket: "example-bucket")
|> range(start: -30d)
|> filter(fn: (r) => r._measurement == "storage_metrics")
|> filter(fn: (r) => r._field == "disk_usage")
|> aggregateWindow(
every: 1d,
fn: mean,
createEmpty: false
)
|> derivative(unit: 1d)
// Predict future usage
from(bucket: "example-bucket")
|> range(start: -7d)
|> filter(fn: (r) => r._measurement == "application_metrics")
|> filter(fn: (r) => r._field == "active_users")
|> linearInterpolate(every: 1h)
|> extrapolate(
every: 1h,
duration: 24h
)
// 12. Complex Data Analysis
// Correlation analysis between metrics
cpu_data = from(bucket: "example-bucket")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "system_metrics")
|> filter(fn: (r) => r._field == "cpu_usage")
|> aggregateWindow(every: 5m, fn: mean, createEmpty: false)
memory_data = from(bucket: "example-bucket")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "system_metrics")
|> filter(fn: (r) => r._field == "memory_usage")
|> aggregateWindow(every: 5m, fn: mean, createEmpty: false)
join(
tables: {cpu: cpu_data, memory: memory_data},
on: ["_time"],
method: "inner"
)
|> map(fn: (r) => ({
r with
cpu_memory_ratio: r._value_cpu / r._value_memory
}))
// Heatmap data preparation
from(bucket: "example-bucket")
|> range(start: -7d)
|> filter(fn: (r) => r._measurement == "sensor_data")
|> filter(fn: (r) => r._field == "temperature")
|> hour()
|> dayOfWeek()
|> group(columns: ["_time", "location"])
|> mean()
|> sort(columns: ["_time"])