🎯 Recommended Samples
Balanced sample collections from various categories for you to explore
InfluxDB Database Samples
InfluxDB time series database examples including data modeling, queries, and monitoring
💻 InfluxDB Basic Operations python
🟢 simple
⭐⭐
Fundamental operations with InfluxDB including data writing, reading, and basic queries
⏱️ 20 min
🏷️ influxdb, time series, iot, monitoring, python
Prerequisites:
Python programming, Basic understanding of time series data
# InfluxDB Basic Operations
# Python - Using influxdb-client library
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS
import datetime
import time
import random
# Configuration
url = "http://localhost:8086"
token = "your-token-here"
org = "your-org"
bucket = "example-bucket"
# Initialize client
class InfluxDBManager:
def __init__(self, url, token, org):
self.url = url
self.token = token
self.org = org
self.client = InfluxDBClient(url=url, token=token, org=org)
self.write_api = self.client.write_api(write_options=SYNCHRONOUS)
self.query_api = self.client.query_api()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.client.close()
# 1. Writing Data Points
def write_single_point(self, measurement, tags, fields, timestamp=None):
"""Write a single data point"""
point = Point(measurement)
# Add tags
for key, value in tags.items():
point.tag(key, value)
# Add fields
for key, value in fields.items():
point.field(key, value)
# Set timestamp
if timestamp:
point.time(timestamp, WritePrecision.NS)
try:
self.write_api.write(bucket=bucket, record=point)
print(f"✅ Written point to {measurement}")
except Exception as e:
print(f"❌ Error writing point: {e}")
def write_multiple_points(self, points):
"""Write multiple data points at once"""
try:
self.write_api.write(bucket=bucket, record=points)
print(f"✅ Written {len(points)} points")
except Exception as e:
print(f"❌ Error writing points: {e}")
# 2. Data Modeling Examples
def write_sensor_data(self):
"""Write IoT sensor data"""
# Temperature sensor readings
for i in range(10):
temperature = 20 + random.uniform(-5, 15)
humidity = 60 + random.uniform(-20, 30)
timestamp = datetime.datetime.now() - datetime.timedelta(minutes=i*5)
point = Point("environmental") .tag("location", "office") .tag("sensor_id", "temp_001") .tag("room", "conference") .field("temperature", temperature) .field("humidity", humidity) .time(timestamp, WritePrecision.NS)
self.write_api.write(bucket=bucket, record=point)
# CPU and memory metrics
for i in range(20):
cpu_usage = random.uniform(20, 90)
memory_usage = random.uniform(30, 85)
timestamp = datetime.datetime.now() - datetime.timedelta(minutes=i)
point = Point("system_metrics") .tag("host", "web-server-01") .tag("region", "us-west-1") .tag("environment", "production") .field("cpu_usage", cpu_usage) .field("memory_usage", memory_usage) .field("disk_usage", random.uniform(40, 70)) .time(timestamp, WritePrecision.NS)
self.write_api.write(bucket=bucket, record=point)
def write_application_metrics(self):
"""Write application performance metrics"""
for i in range(15):
response_time = random.uniform(100, 2000)
error_count = random.choices([0, 1, 2, 3], weights=[70, 20, 8, 2])[0]
active_users = random.randint(100, 500)
timestamp = datetime.datetime.now() - datetime.timedelta(minutes=i*2)
point = Point("application_metrics") .tag("service", "user-service") .tag("endpoint", "/api/users") .tag("version", "v1.2.3") .field("response_time", response_time) .field("error_count", error_count) .field("active_users", active_users) .field("throughput", random.uniform(50, 200)) .time(timestamp, WritePrecision.NS)
self.write_api.write(bucket=bucket, record=point)
# 3. Querying Data
def query_recent_data(self, measurement, duration="1h"):
"""Query recent data from a measurement"""
query = f'''
from(bucket: "{bucket}")
|> range(start: -{duration})
|> filter(fn: (r) => r._measurement == "{measurement}")
|> limit(n: 10)
'''
try:
result = self.query_api.query(query, org=self.org)
records = []
for table in result:
for record in table.records:
records.append({
'time': record.get_time(),
'field': record.get_field(),
'value': record.get_value(),
'measurement': record.get_measurement(),
'tags': record.values
})
print(f"📊 Found {len(records)} records from {measurement}")
return records
except Exception as e:
print(f"❌ Error querying data: {e}")
return []
def query_by_tags(self, measurement, tag_filters, duration="24h"):
"""Query data with specific tag filters"""
tag_conditions = []
for key, value in tag_filters.items():
tag_conditions.append(f'r.{key} == "{value}"')
tag_filter = ' and '.join(tag_conditions)
query = f'''
from(bucket: "{bucket}")
|> range(start: -{duration})
|> filter(fn: (r) => r._measurement == "{measurement}")
|> filter(fn: (r) => {tag_filter})
|> sort(columns: ["_time"])
'''
try:
result = self.query_api.query(query, org=self.org)
return self._process_query_result(result)
except Exception as e:
print(f"❌ Error querying by tags: {e}")
return []
def query_aggregated_data(self, measurement, field, aggregation="mean", window="5m", duration="1h"):
"""Query aggregated data"""
query = f'''
from(bucket: "{bucket}")
|> range(start: -{duration})
|> filter(fn: (r) => r._measurement == "{measurement}")
|> filter(fn: (r) => r._field == "{field}")
|> aggregateWindow(every: {window}, fn: {aggregation}, createEmpty: false)
|> yield(name: "{aggregation}")
'''
try:
result = self.query_api.query(query, org=self.org)
return self._process_query_result(result)
except Exception as e:
print(f"❌ Error querying aggregated data: {e}")
return []
def _process_query_result(self, result):
"""Process query result into list of dictionaries"""
records = []
for table in result:
for record in table.records:
records.append({
'time': record.get_time(),
'field': record.get_field(),
'value': record.get_value(),
'measurement': record.get_measurement(),
'tags': {k: v for k, v in record.values.items() if not k.startswith('_')}
})
return records
# 4. Advanced Queries
def query_downsampled_data(self, measurement, duration="24h", window="1h"):
"""Query downsampled data with multiple aggregations"""
query = f'''
from(bucket: "{bucket}")
|> range(start: -{duration})
|> filter(fn: (r) => r._measurement == "{measurement}")
|> aggregateWindow(
every: {window},
fn: (tables=<-, column="_value") =>
tables
|> mean(column: column)
|> set(key: "_field", value: "mean")
|> yield(name: "mean"),
createEmpty: false
)
'''
try:
result = self.query_api.query(query, org=self.org)
return self._process_query_result(result)
except Exception as e:
print(f"❌ Error querying downsampled data: {e}")
return []
def query_with_transformations(self):
"""Query with data transformations"""
query = '''
from(bucket: "example-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "system_metrics")
|> filter(fn: (r) => r._field == "cpu_usage")
|> map(fn: (r) => ({ r with _value: r._value / 100.0 }))
|> filter(fn: (r) => r._value > 0.8)
|> sort(columns: ["_value"], desc: true)
|> limit(n: 5)
'''
try:
result = self.query_api.query(query, org=self.org)
return self._process_query_result(result)
except Exception as e:
print(f"❌ Error with transformed query: {e}")
return []
# 5. Batch Operations
def batch_write_sensor_data(self, num_points=100):
"""Write multiple sensor data points in batch"""
points = []
locations = ["office", "warehouse", "factory", "datacenter"]
for i in range(num_points):
location = random.choice(locations)
temperature = 20 + random.uniform(-10, 20)
humidity = 50 + random.uniform(-30, 40)
pressure = 1000 + random.uniform(-50, 50)
point = Point("environmental_sensors") .tag("location", location) .tag("sensor_type", "weather_station") .field("temperature", temperature) .field("humidity", humidity) .field("pressure", pressure) .time(datetime.datetime.now() - datetime.timedelta(seconds=i*30), WritePrecision.NS)
points.append(point)
self.write_multiple_points(points)
return points
# Usage Examples
def demonstrate_basic_operations():
"""Demonstrate basic InfluxDB operations"""
with InfluxDBManager(url, token, org) as db:
print("🚀 Starting InfluxDB Basic Operations Demo")
# 1. Write single data points
print("
📝 Writing single data points...")
db.write_single_point(
measurement="test_data",
tags={"source": "demo", "version": "1.0"},
fields={"value": 42.5, "status": "ok"}
)
# 2. Write sensor data
print("
🌡️ Writing sensor data...")
db.write_sensor_data()
# 3. Write application metrics
print("
📊 Writing application metrics...")
db.write_application_metrics()
# 4. Query recent data
print("
🔍 Querying recent data...")
recent_data = db.query_recent_data("environmental", "30m")
for record in recent_data[:5]:
print(f" {record['time']}: {record['field']} = {record['value']}")
# 5. Query with tag filters
print("
🏷️ Querying with tag filters...")
filtered_data = db.query_by_tags(
measurement="system_metrics",
tag_filters={"host": "web-server-01", "environment": "production"},
duration="1h"
)
print(f"Found {len(filtered_data)} records with specified tags")
# 6. Query aggregated data
print("
📈 Querying aggregated data...")
avg_cpu = db.query_aggregated_data(
measurement="system_metrics",
field="cpu_usage",
aggregation="mean",
window="10m",
duration="1h"
)
print(f"Average CPU usage points: {len(avg_cpu)}")
# 7. Batch write
print("
💾 Batch writing sensor data...")
batch_points = db.batch_write_sensor_data(50)
print(f"Batch written {len(batch_points)} points")
# 6. Data Exploration
class InfluxDBExplorer:
def __init__(self, url, token, org):
self.client = InfluxDBClient(url=url, token=token, org=org)
self.query_api = self.client.query_api()
def list_measurements(self, bucket="example-bucket"):
"""List all measurements in a bucket"""
query = f'''
import "influxdata/influxdb/schema"
schema.measurements(bucket: "{bucket}")
'''
try:
result = self.query_api.query(query, org=self.org)
measurements = [record.get_value() for table in result for record in table.records]
print(f"📋 Measurements in bucket '{bucket}': {measurements}")
return measurements
except Exception as e:
print(f"❌ Error listing measurements: {e}")
return []
def show_tag_keys(self, measurement, bucket="example-bucket"):
"""Show all tag keys for a measurement"""
query = f'''
import "influxdata/influxdb/schema"
schema.tagKeys(bucket: "{bucket}", predicate: (r) => r._measurement == "{measurement}")
'''
try:
result = self.query_api.query(query, org=self.org)
tag_keys = [record.get_value() for table in result for record in table.records]
print(f"🏷️ Tag keys for '{measurement}': {tag_keys}")
return tag_keys
except Exception as e:
print(f"❌ Error showing tag keys: {e}")
return []
def show_field_keys(self, measurement, bucket="example-bucket"):
"""Show all field keys for a measurement"""
query = f'''
import "influxdata/influxdb/schema"
schema.fieldKeys(bucket: "{bucket}", predicate: (r) => r._measurement == "{measurement}")
'''
try:
result = self.query_api.query(query, org=self.org)
field_keys = [record.get_value() for table in result for record in table.records]
print(f"🔢 Field keys for '{measurement}': {field_keys}")
return field_keys
except Exception as e:
print(f"❌ Error showing field keys: {e}")
return []
def get_bucket_stats(self, bucket="example-bucket"):
"""Get basic statistics for a bucket"""
query = f'''
from(bucket: "{bucket}")
|> range(start: -24h)
|> group(columns: ["_measurement"])
|> count()
'''
try:
result = self.query_api.query(query, org=self.org)
stats = {}
for table in result:
measurement = table.records[0].values.get("_measurement")
count = table.records[0].get_value()
stats[measurement] = count
print(f"📊 Bucket statistics for '{bucket}':")
for measurement, count in stats.items():
print(f" {measurement}: {count} points")
return stats
except Exception as e:
print(f"❌ Error getting bucket stats: {e}")
return {}
def demonstrate_exploration():
"""Demonstrate data exploration capabilities"""
with InfluxDBExplorer(url, token, org) as explorer:
print("
🔍 Exploring InfluxDB Data Structure")
measurements = explorer.list_measurements()
if measurements:
for measurement in measurements[:3]: # Show first 3
explorer.show_tag_keys(measurement)
explorer.show_field_keys(measurement)
explorer.get_bucket_stats()
if __name__ == "__main__":
# Run the demonstrations
demonstrate_basic_operations()
demonstrate_exploration()
💻 InfluxDB Monitoring and Alerts yaml
🟢 simple
⭐⭐⭐⭐
Setting up monitoring dashboards, alerting rules, and notification systems with InfluxDB
⏱️ 35 min
🏷️ influxdb, monitoring, alerting, devops, telegraf
Prerequisites:
Advanced InfluxDB operations, System administration, Monitoring concepts
# InfluxDB Monitoring and Alerting
# Configuration files and examples for setting up monitoring systems
# 1. Telegraf Configuration for System Monitoring
# /etc/telegraf/telegraf.conf
[agent]
interval = "10s"
round_interval = true
metric_batch_size = 1000
metric_buffer_limit = 10000
collection_jitter = "0s"
flush_interval = "10s"
flush_jitter = "0s"
# Output to InfluxDB
[[outputs.influxdb_v2]]
urls = ["http://localhost:8086"]
token = "your-auth-token"
organization = "your-org"
bucket = "monitoring"
# System input plugins
[[inputs.cpu]]
percpu = true
totalcpu = true
collect_cpu_time = false
report_active = false
core_tags = true
[[inputs.mem]]
fieldpass = ["available", "available_percent", "buffered", "cached", "free", "total", "used", "used_percent"]
[[inputs.disk]]
mount_points = ["/"]
ignore_fs = ["tmpfs", "devtmpfs", "devfs", "overlay", "aufs", "squashfs"]
[[inputs.diskio]]
[[inputs.kernel]]
# No configuration needed, collects kernel metrics
[[inputs.processes]]
collect_parent_pid = true
[[inputs.swap]]
fieldpass = ["used", "used_percent", "free", "total"]
[[inputs.system]]
fieldpass = ["uptime_format", "load1", "load5", "load15"]
# Network monitoring
[[inputs.net]]
fieldpass = ["bytes_sent", "bytes_recv", "packets_sent", "packets_recv", "err_in", "err_out", "drop_in", "drop_out"]
[[inputs.netstat]]
fieldpass = ["tcp_listen", "tcp_established", "tcp_close_wait", "tcp_time_wait", "udp_listen"]
# Application monitoring
[[inputs.prometheus]]
urls = ["http://localhost:9090/metrics"]
metric_version = 2
[[inputs.http_response]]
urls = ["http://localhost:8080/health", "http://localhost:3000/api/health"]
timeout = "5s"
method = "GET"
follow_redirects = true
response_timeout = "5s"
response_string_match = "ok"
# Docker monitoring
[[inputs.docker]]
endpoint = "unix:///var/run/docker.sock"
container_names = []
timeout = "5s"
perdevice = true
total = true
docker_label_include = []
# 2. InfluxDB 2.x Alerting Configuration
# alert-rules.yaml
apiVersion: 1
groups:
- name: system_alerts
rules:
- alert: HighCPUUsage
type: query
query: |
from(bucket: "monitoring")
|> range(start: -5m)
|> filter(fn: (r) => r._measurement == "cpu")
|> filter(fn: (r) => r._field == "usage_active")
|> aggregateWindow(every: 1m, fn: mean)
|> filter(fn: (r) => r._value > 80.0)
status: OK
statusRules:
- current: [OK, Warning, Critical]
color: "green"
- current: [OK, Warning]
color: "yellow"
- current: [OK, Critical]
color: "red"
message: "High CPU usage detected: {{ index .Tags "host" }} has {{ .Values.C }}% CPU usage"
labels:
severity: warning
service: system
- alert: HighMemoryUsage
type: query
query: |
from(bucket: "monitoring")
|> range(start: -5m)
|> filter(fn: (r) => r._measurement == "mem")
|> filter(fn: (r) => r._field == "used_percent")
|> aggregateWindow(every: 1m, fn: mean)
|> filter(fn: (r) => r._value > 85.0)
status: OK
statusRules:
- current: [OK, Warning, Critical]
color: "green"
- current: [OK, Warning]
color: "yellow"
- current: [OK, Critical]
color: "red"
message: "High memory usage on {{ index .Tags "host" }}: {{ .Values.C }}% used"
labels:
severity: warning
service: system
- alert: DiskSpaceLow
type: query
query: |
from(bucket: "monitoring")
|> range(start: -5m)
|> filter(fn: (r) => r._measurement == "disk")
|> filter(fn: (r) => r._field == "used_percent")
|> aggregateWindow(every: 1m, fn: mean)
|> filter(fn: (r) => r._value > 90.0)
status: OK
statusRules:
- current: [OK, Critical]
color: "red"
message: "Disk space critically low on {{ index .Tags "host" }}/{{ index .Tags "path" }}: {{ .Values.C }}% used"
labels:
severity: critical
service: system
- name: application_alerts
rules:
- alert: HighErrorRate
type: query
query: |
from(bucket: "monitoring")
|> range(start: -5m)
|> filter(fn: (r) => r._measurement == "application_metrics")
|> filter(fn: (r) => r._field == "error_rate")
|> filter(fn: (r) => r._value > 5.0)
status: OK
statusRules:
- current: [OK, Warning, Critical]
color: "green"
- current: [OK, Warning]
color: "yellow"
- current: [OK, Critical]
color: "red"
message: "High error rate in {{ index .Tags "service" }}: {{ .Values.C }}% errors"
labels:
severity: warning
service: application
- alert: HighResponseTime
type: query
query: |
from(bucket: "monitoring")
|> range(start: -5m)
|> filter(fn: (r) => r._measurement == "application_metrics")
|> filter(fn: (r) => r._field == "response_time_p95")
|> filter(fn: (r) => r._value > 1000.0)
status: OK
statusRules:
- current: [OK, Warning, Critical]
color: "green"
- current: [OK, Warning]
color: "yellow"
- current: [OK, Critical]
color: "red"
message: "High response time in {{ index .Tags "service" }}: {{ .Values.C }}ms P95"
labels:
severity: warning
service: application
- alert: ServiceDown
type: threshold
thresholds:
- value: 0
level: critical
message: "Service {{ index .Tags "url" }} is down"
labels:
severity: critical
service: availability
- name: custom_threshold_alerts
rules:
- alert: TemperatureHigh
type: query
query: |
from(bucket: "monitoring")
|> range(start: -2m)
|> filter(fn: (r) => r._measurement == "sensor_data")
|> filter(fn: (r) => r._field == "temperature")
|> filter(fn: (r) => r._value > 35.0)
status: OK
statusRules:
- current: [OK, Warning, Critical]
color: "green"
- current: [OK, Warning]
color: "yellow"
- current: [OK, Critical]
color: "red"
message: "High temperature alert: {{ index .Tags "location" }} sensor reports {{ .Values.C }}°C"
labels:
severity: warning
service: environmental
# 3. Notification Endpoints Configuration
# /etc/influxdb2/notifiers.yaml
apiVersion: 1
notifiers:
- name: SlackNotification
type: slack
url: https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK
username: InfluxDBAlerts
channel: "#alerts"
color: critical
template_path: /etc/influxdb2/templates/slack.tmpl
- name: EmailNotification
type: smtp
username: [email protected]
password: your-email-password
host: smtp.gmail.com
port: 587
from: [email protected]
to: "[email protected],[email protected]"
subject: "InfluxDB Alert: {{ .Title }}"
template_path: /etc/influxdb2/templates/email.tmpl
- name: PagerDutyNotification
type: pagerduty
serviceKey: your-pagerduty-service-key
severity: critical
details:
firing: "{{ .State }}"
threshold: "{{ .Thresholds }}"
- name: WebhookNotification
type: http
url: http://your-webhook-endpoint/alerts
method: POST
headers:
Authorization: "Bearer your-token"
Content-Type: "application/json"
body: |
{
"alert": "{{ .Title }}",
"message": "{{ .Message }}",
"severity": "{{ index .Labels "severity" }}",
"timestamp": "{{ .Time }}",
"tags": {{ .Tags | toJSON }}
}
# 4. Notification Templates
# /etc/influxdb2/templates/slack.tmpl
{{ range .Alerts }}
{{ if eq .Status "firing" }}
🚨 **ALERT: {{ .Title }}**
• Message: {{ .Message }}
• Severity: {{ index .Labels "severity" }}
• Time: {{ .Time }}
• Tags: {{ range $key, $value := .Tags }}{{ $key }}={{ $value }} {{ end }}
{{ end }}
{{ if eq .Status "resolved" }}
✅ **RESOLVED: {{ .Title }}**
• Message: {{ .Message }}
• Resolved at: {{ .Time }}
{{ end }}
{{ end }}
# /etc/influxdb2/templates/email.tmpl
Subject: InfluxDB Alert: {{ .Title }}
{{ range .Alerts }}
{{ if eq .Status "firing" }}
ALERT FIRED
Title: {{ .Title }}
Message: {{ .Message }}
Severity: {{ index .Labels "severity" }}
Service: {{ index .Labels "service" }}
Time: {{ .Time }}
Tags:
{{ range $key, $value := .Tags }}
- {{ $key }}: {{ $value }}
{{ end }}
{{ end }}
{{ if eq .Status "resolved" }}
ALERT RESOLVED
Title: {{ .Title }}
Message: {{ .Message }}
Resolved at: {{ .Time }}
Duration: {{ .Duration }}
{{ end }}
{{ end }}
# 5. Grafana Dashboard Configuration
# grafana-dashboard.json (excerpt)
{
"dashboard": {
"title": "System Monitoring Dashboard",
"tags": ["system", "influxdb", "telegraf"],
"timezone": "browser",
"panels": [
{
"title": "CPU Usage",
"type": "stat",
"targets": [
{
"expr": "from(bucket: \"monitoring\") |> range(start: v.timeRangeStart, stop: v.timeRangeStop) |> filter(fn: (r) => r._measurement == \"cpu\") |> filter(fn: (r) => r._field == \"usage_active\") |> last()"
}
],
"fieldConfig": {
"defaults": {
"unit": "percent",
"thresholds": {
"steps": [
{"color": "green", "value": null},
{"color": "yellow", "value": 70},
{"color": "red", "value": 90}
]
}
}
}
},
{
"title": "Memory Usage",
"type": "graph",
"targets": [
{
"expr": "from(bucket: \"monitoring\") |> range(start: v.timeRangeStart, stop: v.timeRangeStop) |> filter(fn: (r) => r._measurement == \"mem\") |> filter(fn: (r) => r._field == \"used_percent\")"
}
],
"yAxes": [
{
"max": 100,
"min": 0,
"unit": "percent"
}
]
},
{
"title": "Disk Usage by Mount Point",
"type": "piechart",
"targets": [
{
"expr": "from(bucket: \"monitoring\") |> range(start: v.timeRangeStart, stop: v.timeRangeStop) |> filter(fn: (r) => r._measurement == \"disk\") |> filter(fn: (r) => r._field == \"used_percent\") |> last()"
}
]
}
],
"time": {
"from": "now-1h",
"to": "now"
},
"refresh": "30s"
}
}
# 6. Python Alert Manager Integration
# alert_manager.py
import requests
import json
import time
from datetime import datetime
from typing import Dict, List, Optional
class InfluxDBAlertManager:
def __init__(self, influx_url: str, token: str, org: str):
self.base_url = influx_url
self.token = token
self.org = org
self.headers = {
"Authorization": f"Token {token}",
"Content-Type": "application/json"
}
def create_check(self, check_name: str, query: str, every: str, offset: str = "0s"):
"""Create an alert check in InfluxDB"""
check_data = {
"name": check_name,
"query": query,
"every": every,
"offset": offset,
"status": "active",
"labels": {
"source": "custom_alerts"
}
}
url = f"{self.base_url}/api/v2/checks"
response = requests.post(url, headers=self.headers, json=check_data)
if response.status_code == 201:
print(f"✅ Alert check '{check_name}' created successfully")
return response.json()
else:
print(f"❌ Failed to create alert check: {response.text}")
return None
def get_alert_endpoints(self):
"""Get all notification endpoints"""
url = f"{self.base_url}/api/v2/notificationEndpoints"
response = requests.get(url, headers=self.headers)
if response.status_code == 200:
return response.json()["notificationEndpoints"]
else:
print(f"❌ Failed to get alert endpoints: {response.text}")
return []
def create_slack_endpoint(self, name: str, webhook_url: str, channel: str):
"""Create a Slack notification endpoint"""
endpoint_data = {
"name": name,
"type": "slack",
"url": webhook_url,
"username": "InfluxDB Alerts",
"channel": channel
}
url = f"{self.base_url}/api/v2/notificationEndpoints"
response = requests.post(url, headers=self.headers, json=endpoint_data)
if response.status_code == 201:
print(f"✅ Slack endpoint '{name}' created successfully")
return response.json()
else:
print(f"❌ Failed to create Slack endpoint: {response.text}")
return None
def create_notification_rule(self, rule_name: str, check_id: str, endpoint_id: str):
"""Create a notification rule"""
rule_data = {
"name": rule_name,
"every": "1m",
"offset": "0s",
"status": "active",
"check": {
"type": "threshold",
"checkID": check_id
},
"endpointID": endpoint_id,
"statusRules": [
{
"currentLevel": "OK",
"previousLevel": "CRITICAL",
"notify": True
}
]
}
url = f"{self.base_url}/api/v2/notificationRules"
response = requests.post(url, headers=self.headers, json=rule_data)
if response.status_code == 201:
print(f"✅ Notification rule '{rule_name}' created successfully")
return response.json()
else:
print(f"❌ Failed to create notification rule: {response.text}")
return None
# Usage example
def setup_monitoring():
"""Setup complete monitoring system"""
alert_manager = InfluxDBAlertManager(
"http://localhost:8086",
"your-token",
"your-org"
)
# Create custom checks
high_cpu_query = '''
from(bucket: "monitoring")
|> range(start: -5m)
|> filter(fn: (r) => r._measurement == "cpu")
|> filter(fn: (r) => r._field == "usage_active")
|> filter(fn: (r) => r._value > 80.0)
'''
alert_manager.create_check(
"High CPU Usage",
high_cpu_query,
"1m",
"30s"
)
# Create notification endpoints
slack_endpoint = alert_manager.create_slack_endpoint(
"DevOps Alerts",
"https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK",
"#monitoring"
)
if slack_endpoint:
# Create notification rule
alert_manager.create_notification_rule(
"CPU Alert Rule",
"check-id-here", # Replace with actual check ID
slack_endpoint["id"]
)
if __name__ == "__main__":
setup_monitoring()
💻 InfluxDB Flux Queries flux
🟡 intermediate
⭐⭐⭐⭐
Advanced Flux query language examples for complex data analysis and transformations
⏱️ 30 min
🏷️ influxdb, flux, queries, analytics, time series
Prerequisites:
Basic InfluxDB operations, Understanding of data analysis concepts
// InfluxDB Flux Query Examples
// Advanced data analysis and transformations
// 1. Basic Query Patterns
// Query recent data from specific measurement
from(bucket: "example-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "sensor_data")
|> filter(fn: (r) => r._field == "temperature")
|> sort(columns: ["_time"])
|> limit(n: 100)
// Query with multiple field filters
from(bucket: "example-bucket")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "system_metrics")
|> filter(fn: (r) => r._field == "cpu_usage" or r._field == "memory_usage")
|> filter(fn: (r) => r.host == "web-server-01")
// 2. Aggregation and Windowing
// Time-based aggregation with mean, min, max
from(bucket: "example-bucket")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "sensor_data")
|> filter(fn: (r) => r._field == "temperature")
|> aggregateWindow(
every: 1h,
fn: mean,
createEmpty: false
)
|> yield(name: "hourly_avg")
// Multiple aggregations
from(bucket: "example-bucket")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "sensor_data")
|> filter(fn: (r) => r._field == "temperature")
|> aggregateWindow(
every: 1h,
fn: (tables=<-, column="_value") => tables
|> mean(column: column)
|> map(fn: (r) => ({r with _field: "temperature_mean"}))
|> yield(name: "mean"),
createEmpty: false
)
|> aggregateWindow(
every: 1h,
fn: (tables=<-, column="_value") => tables
|> max(column: column)
|> map(fn: (r) => ({r with _field: "temperature_max"}))
|> yield(name: "max"),
createEmpty: false
)
|> aggregateWindow(
every: 1h,
fn: (tables=<-, column="_value") => tables
|> min(column: column)
|> map(fn: (r) => ({r with _field: "temperature_min"}))
|> yield(name: "min"),
createEmpty: false
)
// 3. Data Transformation
// Unit conversion - Celsius to Fahrenheit
from(bucket: "example-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "sensor_data")
|> filter(fn: (r) => r._field == "temperature")
|> map(fn: (r) => ({
r with
_value: r._value * 9.0 / 5.0 + 32.0,
_field: "temperature_fahrenheit"
}))
// Calculate percentage from decimal
from(bucket: "example-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "system_metrics")
|> filter(fn: (r) => r._field == "cpu_usage")
|> map(fn: (r) => ({
r with
_value: r._value * 100.0,
_field: "cpu_usage_percent"
}))
// 4. Filtering and Thresholding
// Filter by value thresholds
from(bucket: "example-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "sensor_data")
|> filter(fn: (r) => r._field == "temperature")
|> filter(fn: (r) => r._value > 30.0 or r._value < 10.0)
// Filter by time windows (business hours only)
from(bucket: "example-bucket")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "application_metrics")
|> hourSelection(start: 9, stop: 17)
// 5. Joins and Relations
// Join temperature and humidity data
temperature = from(bucket: "example-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "sensor_data")
|> filter(fn: (r) => r._field == "temperature")
|> rename(columns: {_value: "temperature"})
humidity = from(bucket: "example-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "sensor_data")
|> filter(fn: (r) => r._field == "humidity")
|> rename(columns: {_value: "humidity"})
join(
tables: {t1: temperature, t2: humidity},
on: ["_time", "location", "sensor_id"],
method: "inner"
)
// 6. Mathematical Operations
// Calculate moving average
from(bucket: "example-bucket")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "sensor_data")
|> filter(fn: (r) => r._field == "temperature")
|> movingAverage(n: 10)
// Calculate rate of change (derivative)
from(bucket: "example-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "system_metrics")
|> filter(fn: (r) => r._field == "memory_usage")
|> derivative(unit: 1m, nonNegative: true)
// Calculate difference between consecutive points
from(bucket: "example-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "counters")
|> filter(fn: (r) => r._field == "request_count")
|> difference(nonNegative: true)
// 7. Statistical Analysis
// Calculate percentiles
from(bucket: "example-bucket")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "application_metrics")
|> filter(fn: (r) => r._field == "response_time")
|> aggregateWindow(
every: 1h,
fn: (tables=<-, column="_value") => tables
|> percentile(column: column, percentile: 0.50)
|> set(key: "_field", value: "response_time_p50")
|> yield(name: "p50"),
createEmpty: false
)
|> aggregateWindow(
every: 1h,
fn: (tables=<-, column="_value") => tables
|> percentile(column: column, percentile: 0.95)
|> set(key: "_field", value: "response_time_p95")
|> yield(name: "p95"),
createEmpty: false
)
|> aggregateWindow(
every: 1h,
fn: (tables=<-, column="_value") => tables
|> percentile(column: column, percentile: 0.99)
|> set(key: "_field", value: "response_time_p99")
|> yield(name: "p99"),
createEmpty: false
)
// Calculate standard deviation
from(bucket: "example-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "sensor_data")
|> filter(fn: (r) => r._field == "temperature")
|> stddev()
// 8. Time-based Analysis
// Analyze patterns by hour of day
from(bucket: "example-bucket")
|> range(start: -7d)
|> filter(fn: (r) => r._measurement == "application_metrics")
|> filter(fn: (r) => r._field == "active_users")
|> group(columns: ["_measurement"])
|> aggregateWindow(
every: 1h,
fn: mean,
createEmpty: false
)
|> hour()
|> group(columns: ["_time"])
|> mean()
// Day of week analysis
from(bucket: "example-bucket")
|> range(start: -30d)
|> filter(fn: (r) => r._measurement == "application_metrics")
|> filter(fn: (r) => r._field == "throughput")
|> group(columns: ["_measurement"])
|> aggregateWindow(
every: 1d,
fn: mean,
createEmpty: false
)
|> dayOfWeek()
|> group(columns: ["_time"])
|> mean()
// 9. Custom Functions
// Custom function for anomaly detection
anomalies = (table=<-, threshold=2.0) =>
table
|> mean(column: "_value")
|> map(fn: (r) => ({r with _value: r._value}))
|> stddev()
|> map(fn: (r) => ({r with upper_bound: r._value + threshold * r._value}))
|> map(fn: (r) => ({r with lower_bound: r._value - threshold * r._value}))
// Use custom function
from(bucket: "example-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "sensor_data")
|> filter(fn: (r) => r._field == "temperature")
|> anomalies(threshold: 2.0)
// 10. Performance Monitoring Queries
// Calculate SLA compliance
from(bucket: "example-bucket")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "application_metrics")
|> filter(fn: (r) => r._field == "response_time")
|> map(fn: (r) => ({
r with
_value: if r._value <= 500 then 1.0 else 0.0,
_field: "sla_compliant"
}))
|> aggregateWindow(
every: 1h,
fn: mean,
createEmpty: false
)
|> map(fn: (r) => ({
r with
_value: r._value * 100.0,
_field: "sla_compliance_percent"
}))
// Error rate calculation
total_requests = from(bucket: "example-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "application_metrics")
|> filter(fn: (r) => r._field == "request_count")
|> sum()
error_requests = from(bucket: "example-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "application_metrics")
|> filter(fn: (r) => r._field == "error_count")
|> sum()
join(
tables: {t1: total_requests, t2: error_requests},
on: ["_time"],
method: "inner"
)
|> map(fn: (r) => ({
r with
_value: r._value_t2 / r._value_t1 * 100.0,
_field: "error_rate_percent"
}))
// 11. Capacity Planning
// Growth rate calculation
from(bucket: "example-bucket")
|> range(start: -30d)
|> filter(fn: (r) => r._measurement == "storage_metrics")
|> filter(fn: (r) => r._field == "disk_usage")
|> aggregateWindow(
every: 1d,
fn: mean,
createEmpty: false
)
|> derivative(unit: 1d)
// Predict future usage
from(bucket: "example-bucket")
|> range(start: -7d)
|> filter(fn: (r) => r._measurement == "application_metrics")
|> filter(fn: (r) => r._field == "active_users")
|> linearInterpolate(every: 1h)
|> extrapolate(
every: 1h,
duration: 24h
)
// 12. Complex Data Analysis
// Correlation analysis between metrics
cpu_data = from(bucket: "example-bucket")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "system_metrics")
|> filter(fn: (r) => r._field == "cpu_usage")
|> aggregateWindow(every: 5m, fn: mean, createEmpty: false)
memory_data = from(bucket: "example-bucket")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "system_metrics")
|> filter(fn: (r) => r._field == "memory_usage")
|> aggregateWindow(every: 5m, fn: mean, createEmpty: false)
join(
tables: {cpu: cpu_data, memory: memory_data},
on: ["_time"],
method: "inner"
)
|> map(fn: (r) => ({
r with
cpu_memory_ratio: r._value_cpu / r._value_memory
}))
// Heatmap data preparation
from(bucket: "example-bucket")
|> range(start: -7d)
|> filter(fn: (r) => r._measurement == "sensor_data")
|> filter(fn: (r) => r._field == "temperature")
|> hour()
|> dayOfWeek()
|> group(columns: ["_time", "location"])
|> mean()
|> sort(columns: ["_time"])