Exemples HashiCorp Vault

Exemples d'implémentation HashiCorp Vault pour la gestion sécurisée des secrets, du chiffrement et du contrôle d'accès

💻 Gestion des Secrets Vault go

🟡 intermediate ⭐⭐⭐⭐

Implémentation complète de gestion des secrets Vault avec authentification, opérations CRUD et gestion des politiques

⏱️ 50 min 🏷️ vault, secrets, go, security, hashicorp
Prerequisites: Go programming, HashiCorp Vault concepts, Secrets management, Security
package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"os"
	"time"

	"github.com/hashicorp/vault/api"
	"golang.org/x/crypto/bcrypt"
)

// VaultSecretsManager provides a high-level interface for HashiCorp Vault operations
type VaultSecretsManager struct {
	client   *api.Client
	policies map[string]string
	logger   *log.Logger
}

// VaultConfig holds configuration for connecting to Vault
type VaultConfig struct {
	Address      string
	Token        string
	RoleID       string
	SecretID     string
	K8SMountPath string
	Namespace    string
}

// SecretData represents a secret structure
type SecretData struct {
	Path      string                 `json:"path"`
	Data      map[string]interface{} `json:"data"`
	Metadata  map[string]interface{} `json:"metadata,omitempty"`
	TTL       *time.Duration         `json:"ttl,omitempty"`
	CreatedAt time.Time              `json:"created_at"`
	UpdatedAt time.Time              `json:"updated_at"`
}

// Policy represents a Vault policy
type Policy struct {
	Name     string `json:"name"`
	Rules    string `json:"rules"`
	Template string `json:"template,omitempty"`
}

// NewVaultSecretsManager creates a new Vault secrets manager
func NewVaultSecretsManager(config VaultConfig) (*VaultSecretsManager, error) {
	vsm := &VaultSecretsManager{
		policies: make(map[string]string),
		logger:   log.New(os.Stdout, "[VAULT] ", log.LstdFlags),
	}

	// Configure Vault client
	vaultConfig := api.DefaultConfig()
	vaultConfig.Address = config.Address

	client, err := api.NewClient(vaultConfig)
	if err != nil {
		return nil, fmt.Errorf("failed to create Vault client: %w", err)
	}

	vsm.client = client

	// Set namespace if provided
	if config.Namespace != "" {
		vsm.client.SetNamespace(config.Namespace)
	}

	// Authenticate with Vault
	err = vsm.authenticate(config)
	if err != nil {
		return nil, fmt.Errorf("failed to authenticate with Vault: %w", err)
	}

	vsm.logger.Println("Successfully connected to Vault")

	// Initialize default policies
	err = vsm.initializeDefaultPolicies()
	if err != nil {
		vsm.logger.Printf("Warning: failed to initialize default policies: %v", err)
	}

	return vsm, nil
}

// authenticate handles different authentication methods
func (vsm *VaultSecretsManager) authenticate(config VaultConfig) error {
	if config.Token != "" {
		// Token authentication
		vsm.client.SetToken(config.Token)
		return nil
	}

	if config.RoleID != "" && config.SecretID != "" {
		// AppRole authentication
		return vsm.authenticateWithAppRole(config.RoleID, config.SecretID)
	}

	// Kubernetes authentication
	if config.K8SMountPath != "" {
		return vsm.authenticateWithK8S(config.K8SMountPath)
	}

	return fmt.Errorf("no valid authentication method provided")
}

// authenticateWithAppRole authenticates using AppRole
func (vsm *VaultSecretsManager) authenticateWithAppRole(roleID, secretID string) error {
	secret, err := vsm.client.Logical().Write("auth/approle/login", map[string]interface{}{
		"role_id":   roleID,
		"secret_id": secretID,
	})

	if err != nil {
		return fmt.Errorf("AppRole authentication failed: %w", err)
	}

	if secret == nil || secret.Auth == nil || secret.Auth.ClientToken == "" {
		return fmt.Errorf("no token received from AppRole authentication")
	}

	vsm.client.SetToken(secret.Auth.ClientToken)
	vsm.logger.Printf("Authenticated with AppRole, token expires: %v", secret.Auth.LeaseDuration)

	return nil
}

// authenticateWithK8S authenticates using Kubernetes
func (vsm *VaultSecretsManager) authenticateWithK8S(mountPath string) error {
	// Read JWT token from service account
	jwt, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/token")
	if err != nil {
		return fmt.Errorf("failed to read Kubernetes service account token: %w", err)
	}

	// Read the current pod name and namespace
	podName, err := os.Hostname()
	if err != nil {
		return fmt.Errorf("failed to get pod name: %w", err)
	}

	namespace, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace")
	if err != nil {
		return fmt.Errorf("failed to read namespace: %w", err)
	}

	// Authenticate with Vault
	secret, err := vsm.client.Logical().Write(fmt.Sprintf("auth/%s/login", mountPath), map[string]interface{}{
		"jwt":  string(jwt),
		"role": "default", // This should be configurable
	})

	if err != nil {
		return fmt.Errorf("Kubernetes authentication failed: %w", err)
	}

	if secret == nil || secret.Auth == nil || secret.Auth.ClientToken == "" {
		return fmt.Errorf("no token received from Kubernetes authentication")
	}

	vsm.client.SetToken(secret.Auth.ClientToken)
	vsm.logger.Printf("Authenticated with Kubernetes, token expires: %v", secret.Auth.LeaseDuration)

	return nil
}

// initializeDefaultPolicies creates standard policies
func (vsm *VaultSecretsManager) initializeDefaultPolicies() error {
	policies := map[string]string{
		"admin": `
path "sys/*" {
  capabilities = ["create", "read", "update", "delete", "list", "sudo"]
}

path "auth/*" {
  capabilities = ["create", "read", "update", "delete", "list"]
}

path "secret/*" {
  capabilities = ["create", "read", "update", "delete", "list"]
}

path "pki/*" {
  capabilities = ["create", "read", "update", "delete", "list"]
}

path "kv/*" {
  capabilities = ["create", "read", "update", "delete", "list"]
}
`,

		"readonly": `
path "secret/*" {
  capabilities = ["read", "list"]
}

path "kv/*" {
  capabilities = ["read", "list"]
}

path "pki/issue/*" {
  capabilities = ["update"]
}

path "pki/cert/*" {
  capabilities = ["read", "list"]
}
`,

		"app-developer": `
path "kv/data/apps/*" {
  capabilities = ["create", "read", "update", "delete", "list"]
}

path "kv/metadata/apps/*" {
  capabilities = ["read", "update", "delete", "list"]
}

path "pki/issue/*" {
  capabilities = ["update"]
}

path "transit/encrypt/*" {
  capabilities = ["update"]
}

path "transit/decrypt/*" {
  capabilities = ["update"]
}
`,

		"app-readonly": `
path "kv/data/apps/*" {
  capabilities = ["read", "list"]
}

path "kv/metadata/apps/*" {
  capabilities = ["read", "list"]
}

path "pki/issue/*" {
  capabilities = ["deny"]
}

path "transit/encrypt/*" {
  capabilities = ["deny"]
}

path "transit/decrypt/*" {
  capabilities = ["deny"]
}
`,
	}

	for name, rules := range policies {
		err := vsm.createPolicy(name, rules)
		if err != nil {
			vsm.logger.Printf("Failed to create policy %s: %v", name, err)
		} else {
			vsm.logger.Printf("Created policy: %s", name)
		}
	}

	return nil
}

// createPolicy creates or updates a Vault policy
func (vsm *VaultSecretsManager) createPolicy(name, rules string) error {
	_, err := vsm.client.Logical().Write(fmt.Sprintf("sys/policies/acl/%s", name), map[string]interface{}{
		"policy": rules,
	})

	if err != nil {
		return fmt.Errorf("failed to create policy %s: %w", name, err)
	}

	vsm.policies[name] = rules
	return nil
}

// ReadPolicy reads a Vault policy
func (vsm *VaultSecretsManager) readPolicy(name string) (string, error) {
	secret, err := vsm.client.Logical().Read(fmt.Sprintf("sys/policies/acl/%s", name))
	if err != nil {
		return "", fmt.Errorf("failed to read policy %s: %w", name, err)
	}

	if secret == nil || secret.Data == nil {
		return "", fmt.Errorf("policy %s not found", name)
	}

	rules, ok := secret.Data["rules"].(string)
	if !ok {
		return "", fmt.Errorf("invalid policy data format")
	}

	return rules, nil
}

// ListPolicies lists all available policies
func (vsm *VaultSecretsManager) ListPolicies() ([]string, error) {
	secret, err := vsm.client.Logical().Read("sys/policies/acl")
	if err != nil {
		return nil, fmt.Errorf("failed to list policies: %w", err)
	}

	if secret == nil || secret.Data == nil {
		return []string{}, nil
	}

	policiesData, ok := secret.Data["policies"].([]interface{})
	if !ok {
		return nil, fmt.Errorf("invalid policies data format")
	}

	var policies []string
	for _, policy := range policiesData {
		if policyName, ok := policy.(string); ok {
			policies = append(policies, policyName)
		}
	}

	return policies, nil
}

// WriteSecret writes a secret to Vault KV v2
func (vsm *VaultSecretsManager) WriteSecret(path string, data map[string]interface{}, ttl *time.Duration) error {
	// Prepare secret data
	secretData := map[string]interface{}{
		"data": data,
	}

	// Add TTL if specified
	if ttl != nil {
		secretData["options"] = map[string]interface{}{
			"ttl": ttl.String(),
		}
	}

	// Write secret to KV v2
	_, err := vsm.client.Logical().Write(fmt.Sprintf("kv/data/%s", path), secretData)
	if err != nil {
		return fmt.Errorf("failed to write secret %s: %w", path, err)
	}

	vsm.logger.Printf("Secret written: %s", path)
	return nil
}

// ReadSecret reads a secret from Vault KV v2
func (vsm *VaultSecretsManager) ReadSecret(path string) (map[string]interface{}, error) {
	secret, err := vsm.client.Logical().Read(fmt.Sprintf("kv/data/%s", path))
	if err != nil {
		return nil, fmt.Errorf("failed to read secret %s: %w", path, err)
	}

	if secret == nil || secret.Data == nil {
		return nil, fmt.Errorf("secret %s not found", path)
	}

	// Extract data from KV v2 response
	data, ok := secret.Data["data"].(map[string]interface{})
	if !ok {
		return nil, fmt.Errorf("invalid secret data format")
	}

	// Add metadata if available
	if metadata, exists := secret.Data["metadata"].(map[string]interface{}); exists {
		data["_metadata"] = metadata
	}

	return data, nil
}

// ReadSecretWithVersion reads a specific version of a secret
func (vsm *VaultSecretsManager) ReadSecretWithVersion(path string, version int) (map[string]interface{}, error) {
	secret, err := vsm.client.Logical().ReadWithData(fmt.Sprintf("kv/data/%s", path), map[string][]string{
		"version": {fmt.Sprintf("%d", version)},
	})

	if err != nil {
		return nil, fmt.Errorf("failed to read secret %s version %d: %w", path, version, err)
	}

	if secret == nil || secret.Data == nil {
		return nil, fmt.Errorf("secret %s version %d not found", path, version)
	}

	data, ok := secret.Data["data"].(map[string]interface{})
	if !ok {
		return nil, fmt.Errorf("invalid secret data format")
	}

	// Add version metadata
	if metadata, exists := secret.Data["metadata"].(map[string]interface{}); exists {
		data["_metadata"] = metadata
		data["_metadata"]["requested_version"] = version
	}

	return data, nil
}

// ListSecrets lists secrets at a given path
func (vsm *VaultSecretsManager) ListSecrets(path string) ([]string, error) {
	secret, err := vsm.client.Logical().List(fmt.Sprintf("kv/metadata/%s", path))
	if err != nil {
		return nil, fmt.Errorf("failed to list secrets %s: %w", path, err)
	}

	if secret == nil || secret.Data == nil {
		return []string{}, nil
	}

	keys, ok := secret.Data["keys"].([]interface{})
	if !ok {
		return nil, fmt.Errorf("invalid list data format")
	}

	var result []string
	for _, key := range keys {
		if keyStr, ok := key.(string); ok {
			result = append(result, keyStr)
		}
	}

	return result, nil
}

// DeleteSecret deletes a secret from Vault KV v2
func (vsm *VaultSecretsManager) DeleteSecret(path string) error {
	// Delete latest version (soft delete)
	_, err := vsm.client.Logical().Delete(fmt.Sprintf("kv/data/%s", path))
	if err != nil {
		return fmt.Errorf("failed to delete secret %s: %w", path, err)
	}

	// Permanent delete all versions
	_, err = vsm.client.Logical().Delete(fmt.Sprintf("kv/destroy/%s", path))
	if err != nil {
		vsm.logger.Printf("Warning: failed to permanently destroy secret %s: %v", path, err)
	}

	vsm.logger.Printf("Secret deleted: %s", path)
	return nil
}

// DeleteSecretVersion deletes a specific version of a secret
func (vsm *VaultSecretsManager) DeleteSecretVersion(path string, versions []int) error {
	versionsData := make([]string, len(versions))
	for i, v := range versions {
		versionsData[i] = fmt.Sprintf("%d", v)
	}

	_, err := vsm.client.Logical().Write(fmt.Sprintf("kv/destroy/%s", path), map[string]interface{}{
		"versions": versionsData,
	})

	if err != nil {
		return fmt.Errorf("failed to destroy secret versions for %s: %w", path, err)
	}

	vsm.logger.Printf("Secret versions destroyed: %s, versions: %v", path, versions)
	return nil
}

// UndeleteSecretVersions undeletes specific versions of a secret
func (vsm *VaultSecretsManager) UndeleteSecretVersions(path string, versions []int) error {
	versionsData := make([]string, len(versions))
	for i, v := range versions {
		versionsData[i] = fmt.Sprintf("%d", v)
	}

	_, err := vsm.client.Logical().Write(fmt.Sprintf("kv/undelete/%s", path), map[string]interface{}{
		"versions": versionsData,
	})

	if err != nil {
		return fmt.Errorf("failed to undelete secret versions for %s: %w", path, err)
	}

	vsm.logger.Printf("Secret versions undeleted: %s, versions: %v", path, versions)
	return nil
}

// GetSecretMetadata retrieves metadata for a secret
func (vsm *VaultSecretsManager) GetSecretMetadata(path string) (map[string]interface{}, error) {
	secret, err := vsm.client.Logical().Read(fmt.Sprintf("kv/metadata/%s", path))
	if err != nil {
		return nil, fmt.Errorf("failed to read metadata for %s: %w", path, err)
	}

	if secret == nil || secret.Data == nil {
		return nil, fmt.Errorf("metadata for %s not found", path)
	}

	return secret.Data, nil
}

// UpdateSecretMetadata updates metadata for a secret
func (vsm *VaultSecretsManager) UpdateSecretMetadata(path string, metadata map[string]interface{}) error {
	_, err := vsm.client.Logical().Write(fmt.Sprintf("kv/metadata/%s", path), metadata)
	if err != nil {
		return fmt.Errorf("failed to update metadata for %s: %w", path, err)
	}

	vsm.logger.Printf("Metadata updated for: %s", path)
	return nil
}

// CreateToken creates a new Vault token with specific policies and TTL
func (vsm *VaultSecretsManager) CreateToken(policies []string, ttl time.Duration, renewable bool) (string, error) {
	tokenRequest := &api.TokenCreateRequest{
		Policies:  policies,
		TTL:       ttl.String(),
		Renewable: &renewable,
	}

	secret, err := vsm.client.Auth().Token().Create(tokenRequest)
	if err != nil {
		return "", fmt.Errorf("failed to create token: %w", err)
	}

	if secret == nil || secret.Auth == nil || secret.Auth.ClientToken == "" {
		return "", fmt.Errorf("no token received")
	}

	vsm.logger.Printf("Token created with policies: %v, TTL: %v", policies, ttl)
	return secret.Auth.ClientToken, nil
}

// RenewToken renews a Vault token
func (vsm *VaultSecretsManager) RenewToken(token string, ttl time.Duration) error {
	// Create a temporary client with the token to renew
	tempClient := vsm.client.Clone()
	tempClient.SetToken(token)

	secret, err := tempClient.Auth().Token().RenewSelf(ttl)
	if err != nil {
		return fmt.Errorf("failed to renew token: %w", err)
	}

	if secret == nil || secret.Auth == nil {
		return fmt.Errorf("no token data received during renewal")
	}

	vsm.logger.Printf("Token renewed, new TTL: %v", secret.Auth.LeaseDuration)
	return nil
}

// RevokeToken revokes a Vault token
func (vsm *VaultSecretsManager) RevokeToken(token string) error {
	err := vsm.client.Auth().Token().Revoke(token)
	if err != nil {
		return fmt.Errorf("failed to revoke token: %w", err)
	}

	vsm.logger.Println("Token revoked")
	return nil
}

// CreateAppRole creates a new AppRole
func (vsm *VaultSecretsManager) CreateAppRole(name string, policies []string, ttl time.Duration) error {
	appRoleData := map[string]interface{}{
		"policies":  policies,
		"ttl":       ttl.String(),
		"period":    ttl.String(),
		"bind_secret_id": true,
	}

	_, err := vsm.client.Logical().Write(fmt.Sprintf("auth/approle/role/%s", name), appRoleData)
	if err != nil {
		return fmt.Errorf("failed to create AppRole %s: %w", name, err)
	}

	vsm.logger.Printf("AppRole created: %s", name)
	return nil
}

// GetAppRoleSecretID generates a new SecretID for an AppRole
func (vsm *VaultSecretsManager) GetAppRoleSecretID(appRole string) (string, error) {
	secret, err := vsm.client.Logical().Write(fmt.Sprintf("auth/approle/role/%s/secret-id", appRole), nil)
	if err != nil {
		return "", fmt.Errorf("failed to generate SecretID for AppRole %s: %w", appRole, err)
	}

	if secret == nil || secret.Data == nil {
		return "", fmt.Errorf("no SecretID data received")
	}

	secretID, ok := secret.Data["secret_id"].(string)
	if !ok {
		return "", fmt.Errorf("invalid SecretID data format")
	}

	vsm.logger.Printf("SecretID generated for AppRole: %s", appRole)
	return secretID, nil
}

// GetAppRoleRoleID retrieves the RoleID for an AppRole
func (vsm *VaultSecretsManager) GetAppRoleRoleID(appRole string) (string, error) {
	secret, err := vsm.client.Logical().Read(fmt.Sprintf("auth/approle/role/%s/role-id", appRole))
	if err != nil {
		return "", fmt.Errorf("failed to get RoleID for AppRole %s: %w", appRole, err)
	}

	if secret == nil || secret.Data == nil {
		return "", fmt.Errorf("no RoleID data received")
	}

	roleID, ok := secret.Data["role_id"].(string)
	if !ok {
		return "", fmt.Errorf("invalid RoleID data format")
	}

	return roleID, nil
}

// Health performs a health check on Vault
func (vsm *VaultSecretsManager) Health() (*api.HealthResponse, error) {
	return vsm.client.Sys().Health()
}

// SealStatus checks if Vault is sealed
func (vsm *VaultSecretsManager) SealStatus() (*api.SealStatusResponse, error) {
	return vsm.client.Sys().SealStatus()
}

// BackupSecrets exports all secrets (for backup purposes)
func (vsm *VaultSecretsManager) BackupSecrets() (map[string]interface{}, error) {
	backup := make(map[string]interface{})
	backup["timestamp"] = time.Now().UTC()
	backup["secrets"] = make(map[string]interface{})

	// List all top-level paths
	topLevelPaths := []string{"", "apps/", "infrastructure/", "users/"}

	for _, basePath := range topLevelPaths {
		err := vsm.backupSecretPath(basePath, backup["secrets"].(map[string]interface{}))
		if err != nil {
			vsm.logger.Printf("Warning: failed to backup path %s: %v", basePath, err)
		}
	}

	return backup, nil
}

// backupSecretPath recursively backs up secrets from a path
func (vsm *VaultSecretsManager) backupSecretPath(path string, backup map[string]interface{}) error {
	secrets, err := vsm.ListSecrets(path)
	if err != nil {
		return err
	}

	for _, secretPath := range secrets {
		fullPath := path + secretPath

		// Check if it's a subfolder (ends with /)
		if len(secretPath) > 0 && secretPath[len(secretPath)-1] == '/' {
			// It's a folder, recurse
			subBackup := make(map[string]interface{})
			err := vsm.backupSecretPath(fullPath, subBackup)
			if err != nil {
				return err
			}
			backup[secretPath] = subBackup
		} else {
			// It's a secret, read and store
			secretData, err := vsm.ReadSecret(fullPath)
			if err != nil {
				return err
			}
			backup[secretPath] = secretData
		}
	}

	return nil
}

// Close cleans up resources
func (vsm *VaultSecretsManager) Close() {
	vsm.logger.Println("Vault secrets manager closed")
}

// Main function demonstrating usage
func main() {
	// Configuration
	config := VaultConfig{
		Address:      os.Getenv("VAULT_ADDR"),
		RoleID:       os.Getenv("VAULT_ROLE_ID"),
		SecretID:     os.Getenv("VAULT_SECRET_ID"),
		K8SMountPath: "kubernetes",
		Namespace:    "admin",
	}

	// Create secrets manager
	vsm, err := NewVaultSecretsManager(config)
	if err != nil {
		log.Fatalf("Failed to create Vault secrets manager: %v", err)
	}
	defer vsm.Close()

	// Health check
	health, err := vsm.Health()
	if err != nil {
		log.Printf("Health check failed: %v", err)
	} else {
		log.Printf("Vault health: %v, version: %s", health.Initialized, health.Version)
	}

	// Example: Write a secret
	appSecret := map[string]interface{}{
		"database_url": "postgresql://localhost:5432/myapp",
		"api_key":       "sk-1234567890abcdef",
		"debug_mode":    true,
	}

	err = vsm.WriteSecret("apps/myapp/config", appSecret, &time.Hour*24*7) // 7 days TTL
	if err != nil {
		log.Printf("Failed to write secret: %v", err)
	}

	// Example: Read a secret
	secretData, err := vsm.ReadSecret("apps/myapp/config")
	if err != nil {
		log.Printf("Failed to read secret: %v", err)
	} else {
		log.Printf("Secret data: %+v", secretData)
	}

	// Example: Create a policy
	policy := `
path "kv/data/apps/myapp/*" {
  capabilities = ["create", "read", "update", "delete"]
}
`

	err = vsm.createPolicy("myapp-policy", policy)
	if err != nil {
		log.Printf("Failed to create policy: %v", err)
	}

	// Example: Create an AppRole
	err = vsm.CreateAppRole("myapp-approle", []string{"myapp-policy"}, time.Hour*24)
	if err != nil {
		log.Printf("Failed to create AppRole: %v", err)
	}

	// Example: Get RoleID and SecretID
	roleID, err := vsm.GetAppRoleRoleID("myapp-approle")
	if err != nil {
		log.Printf("Failed to get RoleID: %v", err)
	} else {
		log.Printf("AppRole RoleID: %s", roleID)
	}

	secretID, err := vsm.GetAppRoleSecretID("myapp-approle")
	if err != nil {
		log.Printf("Failed to get SecretID: %v", err)
	} else {
		log.Printf("AppRole SecretID: %s", secretID)
	}

	// Example: Create a token
	token, err := vsm.CreateToken([]string{"myapp-policy"}, time.Hour*8, true)
	if err != nil {
		log.Printf("Failed to create token: %v", err)
	} else {
		log.Printf("Created token: %s", token[:20]+"...")
	}
}

// Example HTTP handler for using with web frameworks
type VaultHandler struct {
	vsm *VaultSecretsManager
}

func NewVaultHandler(config VaultConfig) (*VaultHandler, error) {
	vsm, err := NewVaultSecretsManager(config)
	if err != nil {
		return nil, err
	}

	return &VaultHandler{vsm: vsm}, nil
}

func (vh *VaultHandler) GetSecretHandler(w http.ResponseWriter, r *http.Request) {
	path := r.URL.Query().Get("path")
	if path == "" {
		http.Error(w, "path parameter is required", http.StatusBadRequest)
		return
	}

	secret, err := vh.vsm.ReadSecret(path)
	if err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}

	w.Header().Set("Content-Type", "application/json")
	json.NewEncoder(w).Encode(secret)
}

func (vh *VaultHandler) WriteSecretHandler(w http.ResponseWriter, r *http.Request) {
	var req struct {
		Path string                 `json:"path"`
		Data map[string]interface{} `json:"data"`
		TTL  string                 `json:"ttl,omitempty"`
	}

	if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
		http.Error(w, err.Error(), http.StatusBadRequest)
		return
	}

	var ttl *time.Duration
	if req.TTL != "" {
		parsedTTL, err := time.ParseDuration(req.TTL)
		if err != nil {
			http.Error(w, "invalid TTL format", http.StatusBadRequest)
			return
		}
		ttl = &parsedTTL
	}

	err := vh.vsm.WriteSecret(req.Path, req.Data, ttl)
	if err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}

	w.WriteHeader(http.StatusCreated)
	json.NewEncoder(w).Encode(map[string]string{"status": "success"})
}

// Export
export { VaultSecretsManager, VaultConfig, SecretData, Policy };

💻 Moteur de Transit Vault python

🔴 complex ⭐⭐⭐⭐⭐

Implémentation de chiffrement en service utilisant le moteur de Transit de Vault avec rotation des clés et chiffrement des données

⏱️ 60 min 🏷️ vault, transit, encryption, python, security
Prerequisites: Python, HashiCorp Vault, Cryptography, Security concepts
#!/usr/bin/env python3
"""
Vault Transit Secrets Engine Implementation
Encryption as a Service with key rotation and secure cryptographic operations
"""

import base64
import json
import logging
import os
import secrets
import time
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple, Any, Union
from datetime import datetime, timedelta

import hvac
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding, rsa
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.backends import default_backend

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@dataclass
class EncryptionKey:
    """Encryption key information"""
    name: str
    type: str  # aes256-gcm, chacha20-poly1305, ed25519, rsa-2048, etc.
    version: int
    created_at: datetime
    latest_version: int
    min_decryption_version: int
    deletion_allowed: bool
    exportable: bool
    supports_encryption: bool
    supports_decryption: bool
    supports_signing: bool
    supports_derivation: bool


@dataclass
class EncryptionResult:
    """Result of encryption operation"""
    ciphertext: str
    iv: Optional[str]
    tag: Optional[str]
    key_version: int
    key_name: str
    encryption_type: str
    timestamp: datetime
    context: Optional[Dict[str, Any]] = None


@dataclass
class DecryptionResult:
    """Result of decryption operation"""
    plaintext: str
    key_name: str
    key_version: int
    encryption_type: str
    timestamp: datetime


@dataclass
class SignatureResult:
    """Result of signing operation"""
    signature: str
    key_name: str
    key_version: int
    algorithm: str
    timestamp: datetime


@dataclass
class KeyRotationResult:
    """Result of key rotation"""
    previous_version: int
    new_version: int
    rotated_at: datetime
    success: bool
    message: str


class VaultTransitClient:
    """
    Vault Transit Secrets Engine Client
    Provides encryption as a service with key management
    """

    def __init__(
        self,
        vault_url: str,
        token: str,
        mount_path: str = "transit",
        namespace: Optional[str] = None
    ):
        """
        Initialize Vault Transit client

        Args:
            vault_url: Vault server URL
            token: Vault authentication token
            mount_path: Transit engine mount path
            namespace: Vault namespace (optional)
        """
        self.vault_url = vault_url
        self.token = token
        self.mount_path = mount_path.rstrip('/')
        self.namespace = namespace

        # Initialize Vault client
        self.client = hvac.Client(
            url=vault_url,
            token=token,
            namespace=namespace
        )

        # Verify connection
        self._verify_connection()

        # Cache for key information
        self._key_cache: Dict[str, EncryptionKey] = {}
        self._cache_ttl = 300  # 5 minutes
        self._last_cache_update = {}

        logger.info(f"Vault Transit client initialized: {vault_url}")

    def _verify_connection(self):
        """Verify connection to Vault"""
        try:
            health = self.client.sys.read_health_status()
            if not health['initialized']:
                raise Exception("Vault is not initialized")
            logger.info("Vault connection verified")
        except Exception as e:
            logger.error(f"Failed to connect to Vault: {e}")
            raise

    def create_key(
        self,
        key_name: str,
        key_type: str = "aes256-gcm",
        exportable: bool = False,
        allow_plaintext_backup: bool = False,
        auto_rotate_period: Optional[str] = None,
        convergent_encryption: bool = False
    ) -> bool:
        """
        Create a new encryption key

        Args:
            key_name: Name of the key
            key_type: Type of key (aes256-gcm, chacha20-poly1305, ed25519, rsa-2048, etc.)
            exportable: Whether the key can be exported
            allow_plaintext_backup: Whether plaintext backup is allowed
            auto_rotate_period: Auto-rotation period (e.g., "30d", "1h")
            convergent_encryption: Whether to use convergent encryption

        Returns:
            True if key was created successfully
        """
        try:
            config = {
                "type": key_type,
                "exportable": exportable,
                "allow_plaintext_backup": allow_plaintext_backup,
                "convergent_encryption": convergent_encryption
            }

            if auto_rotate_period:
                config["auto_rotate_period"] = auto_rotate_period

            path = f"{self.mount_path}/keys/{key_name}"
            self.client.write(path, config)

            logger.info(f"Created key: {key_name} (type: {key_type})")

            # Clear cache for this key
            self._key_cache.pop(key_name, None)

            return True

        except Exception as e:
            logger.error(f"Failed to create key {key_name}: {e}")
            return False

    def get_key_info(self, key_name: str, use_cache: bool = True) -> Optional[EncryptionKey]:
        """
        Get information about an encryption key

        Args:
            key_name: Name of the key
            use_cache: Whether to use cached information

        Returns:
            EncryptionKey object or None if key not found
        """
        # Check cache first
        if (use_cache and key_name in self._key_cache and
                time.time() - self._last_cache_update.get(key_name, 0) < self._cache_ttl):
            return self._key_cache[key_name]

        try:
            path = f"{self.mount_path}/keys/{key_name}"
            response = self.client.read(path)

            if not response or 'data' not in response:
                return None

            data = response['data']
            key_info = EncryptionKey(
                name=key_name,
                type=data['type'],
                version=data['latest_version'],
                created_at=datetime.fromisoformat(data['creation_time'].replace('Z', '+00:00')),
                latest_version=data['latest_version'],
                min_decryption_version=data['min_decryption_version'],
                deletion_allowed=data.get('deletion_allowed', False),
                exportable=data.get('exportable', False),
                supports_encryption=data.get('supports_encryption', False),
                supports_decryption=data.get('supports_decryption', False),
                supports_signing=data.get('supports_signing', False),
                supports_derivation=data.get('supports_derivation', False)
            )

            # Update cache
            self._key_cache[key_name] = key_info
            self._last_cache_update[key_name] = time.time()

            return key_info

        except Exception as e:
            logger.error(f"Failed to get key info for {key_name}: {e}")
            return None

    def list_keys(self) -> List[str]:
        """List all available keys"""
        try:
            path = f"{self.mount_path}/keys"
            response = self.client.list(path)

            if not response or 'data' not in response:
                return []

            keys = response['data'].get('keys', [])
            return keys

        except Exception as e:
            logger.error(f"Failed to list keys: {e}")
            return []

    def encrypt(
        self,
        key_name: str,
        plaintext: Union[str, bytes],
        context: Optional[Dict[str, Any]] = None,
        key_version: Optional[int] = None,
        nonce: Optional[bytes] = None
    ) -> EncryptionResult:
        """
        Encrypt data using Vault Transit engine

        Args:
            key_name: Name of the encryption key
            plaintext: Data to encrypt (string or bytes)
            context: Additional context for convergent encryption
            key_version: Specific key version to use (optional)
            nonce: Custom nonce for deterministic encryption (optional)

        Returns:
            EncryptionResult object
        """
        try:
            # Convert plaintext to base64 if it's bytes
            if isinstance(plaintext, bytes):
                plaintext_b64 = base64.b64encode(plaintext).decode('utf-8')
            else:
                plaintext_b64 = base64.b64encode(plaintext.encode('utf-8')).decode('utf-8')

            # Prepare encryption request
            encrypt_data = {"plaintext": plaintext_b64}

            if context:
                # Convert context to base64
                context_json = json.dumps(context, separators=(',', ':'))
                context_b64 = base64.b64encode(context_json.encode('utf-8')).decode('utf-8')
                encrypt_data["context"] = context_b64

            if key_version:
                encrypt_data["key_version"] = key_version

            if nonce:
                encrypt_data["nonce"] = base64.b64encode(nonce).decode('utf-8')

            path = f"{self.mount_path}/encrypt/{key_name}"
            response = self.client.write(path, encrypt_data)

            if not response or 'data' not in response:
                raise Exception("No response from Vault")

            data = response['data']

            result = EncryptionResult(
                ciphertext=data['ciphertext'],
                iv=data.get('iv'),
                tag=data.get('tag'),
                key_version=data['key_version'],
                key_name=key_name,
                encryption_type="vault-transit",
                timestamp=datetime.utcnow(),
                context=context
            )

            logger.info(f"Encrypted data with key {key_name}, version {result.key_version}")
            return result

        except Exception as e:
            logger.error(f"Failed to encrypt data with key {key_name}: {e}")
            raise

    def decrypt(
        self,
        key_name: str,
        ciphertext: str,
        context: Optional[Dict[str, Any]] = None,
        key_version: Optional[int] = None
    ) -> DecryptionResult:
        """
        Decrypt data using Vault Transit engine

        Args:
            key_name: Name of the decryption key
            ciphertext: Ciphertext to decrypt
            context: Context used during encryption (if applicable)
            key_version: Expected key version (optional)

        Returns:
            DecryptionResult object
        """
        try:
            # Prepare decryption request
            decrypt_data = {"ciphertext": ciphertext}

            if context:
                # Convert context to base64
                context_json = json.dumps(context, separators=(',', ':'))
                context_b64 = base64.b64encode(context_json.encode('utf-8')).decode('utf-8')
                decrypt_data["context"] = context_b64

            if key_version:
                decrypt_data["key_version"] = key_version

            path = f"{self.mount_path}/decrypt/{key_name}"
            response = self.client.write(path, decrypt_data)

            if not response or 'data' not in response:
                raise Exception("No response from Vault")

            data = response['data']
            plaintext_b64 = data['plaintext']

            # Decode base64 and return as string
            plaintext_bytes = base64.b64decode(plaintext_b64)
            plaintext = plaintext_bytes.decode('utf-8')

            result = DecryptionResult(
                plaintext=plaintext,
                key_name=key_name,
                key_version=data.get('key_version', 0),
                encryption_type="vault-transit",
                timestamp=datetime.utcnow()
            )

            logger.info(f"Decrypted data with key {key_name}, version {result.key_version}")
            return result

        except Exception as e:
            logger.error(f"Failed to decrypt data with key {key_name}: {e}")
            raise

    def sign_data(
        self,
        key_name: str,
        input_data: Union[str, bytes],
        algorithm: str = "sha2-256",
        context: Optional[Dict[str, Any]] = None,
        key_version: Optional[int] = None
    ) -> SignatureResult:
        """
        Sign data using Vault Transit engine

        Args:
            key_name: Name of the signing key
            input_data: Data to sign (string or bytes)
            algorithm: Hash algorithm to use
            context: Additional context
            key_version: Specific key version to use (optional)

        Returns:
            SignatureResult object
        """
        try:
            # Convert input data to base64
            if isinstance(input_data, bytes):
                input_b64 = base64.b64encode(input_data).decode('utf-8')
            else:
                input_b64 = base64.b64encode(input_data.encode('utf-8')).decode('utf-8')

            # Prepare signing request
            sign_data = {
                "input": input_b64,
                "algorithm": algorithm
            }

            if context:
                context_json = json.dumps(context, separators=(',', ':'))
                context_b64 = base64.b64encode(context_json.encode('utf-8')).decode('utf-8')
                sign_data["context"] = context_b64

            if key_version:
                sign_data["key_version"] = key_version

            path = f"{self.mount_path}/sign/{key_name}/{algorithm}"
            response = self.client.write(path, sign_data)

            if not response or 'data' not in response:
                raise Exception("No response from Vault")

            data = response['data']

            result = SignatureResult(
                signature=data['signature'],
                key_name=key_name,
                key_version=data.get('key_version', 0),
                algorithm=algorithm,
                timestamp=datetime.utcnow()
            )

            logger.info(f"Signed data with key {key_name}, algorithm {algorithm}")
            return result

        except Exception as e:
            logger.error(f"Failed to sign data with key {key_name}: {e}")
            raise

    def verify_signature(
        self,
        key_name: str,
        input_data: Union[str, bytes],
        signature: str,
        algorithm: str = "sha2-256",
        context: Optional[Dict[str, Any]] = None,
        hmac: bool = False
    ) -> bool:
        """
        Verify a signature using Vault Transit engine

        Args:
            key_name: Name of the verification key
            input_data: Original data that was signed
            signature: Signature to verify
            algorithm: Hash algorithm that was used
            context: Context used during signing (if applicable)
            hmac: Whether to use HMAC verification

        Returns:
            True if signature is valid, False otherwise
        """
        try:
            # Convert input data to base64
            if isinstance(input_data, bytes):
                input_b64 = base64.b64encode(input_data).decode('utf-8')
            else:
                input_b64 = base64.b64encode(input_data.encode('utf-8')).decode('utf-8')

            # Prepare verification request
            verify_data = {
                "input": input_b64,
                "algorithm": algorithm,
                "signature": signature,
                "hmac": hmac
            }

            if context:
                context_json = json.dumps(context, separators=(',', ':'))
                context_b64 = base64.b64encode(context_json.encode('utf-8')).decode('utf-8')
                verify_data["context"] = context_b64

            path = f"{self.mount_path}/verify/{key_name}/{algorithm}"
            response = self.client.write(path, verify_data)

            if not response or 'data' not in response:
                raise Exception("No response from Vault")

            data = response['data']
            is_valid = data.get('valid', False)

            logger.info(f"Signature verification for key {key_name}: {'valid' if is_valid else 'invalid'}")
            return is_valid

        except Exception as e:
            logger.error(f"Failed to verify signature with key {key_name}: {e}")
            return False

    def rotate_key(self, key_name: str) -> KeyRotationResult:
        """
        Rotate an encryption key

        Args:
            key_name: Name of the key to rotate

        Returns:
            KeyRotationResult object
        """
        try:
            # Get current key info before rotation
            current_key_info = self.get_key_info(key_name, use_cache=False)
            if not current_key_info:
                raise Exception(f"Key {key_name} not found")

            previous_version = current_key_info.latest_version

            # Rotate key
            path = f"{self.mount_path}/keys/{key_name}/rotate"
            response = self.client.write(path, {})

            if not response:
                raise Exception("No response from Vault during rotation")

            # Get updated key info
            updated_key_info = self.get_key_info(key_name, use_cache=False)
            if not updated_key_info:
                raise Exception("Failed to get updated key info")

            result = KeyRotationResult(
                previous_version=previous_version,
                new_version=updated_key_info.latest_version,
                rotated_at=datetime.utcnow(),
                success=True,
                message=f"Key rotated from version {previous_version} to {updated_key_info.latest_version}"
            )

            logger.info(result.message)
            return result

        except Exception as e:
            error_msg = f"Failed to rotate key {key_name}: {e}"
            logger.error(error_msg)
            return KeyRotationResult(
                previous_version=0,
                new_version=0,
                rotated_at=datetime.utcnow(),
                success=False,
                message=error_msg
            )

    def configure_key_rotation(
        self,
        key_name: str,
        auto_rotate_period: str,
        min_available_version: Optional[int] = None
    ) -> bool:
        """
        Configure automatic key rotation

        Args:
            key_name: Name of the key
            auto_rotate_period: Rotation period (e.g., "30d", "1h")
            min_available_version: Minimum number of versions to keep

        Returns:
            True if configuration was successful
        """
        try:
            config = {
                "auto_rotate_period": auto_rotate_period
            }

            if min_available_version:
                config["min_available_version"] = min_available_version

            path = f"{self.mount_path}/keys/{key_name}/config"
            self.client.write(path, config)

            logger.info(f"Configured key rotation for {key_name}: {auto_rotate_period}")
            return True

        except Exception as e:
            logger.error(f"Failed to configure key rotation for {key_name}: {e}")
            return False

    def export_key(
        self,
        key_name: str,
        key_version: Optional[int] = None,
        key_type: str = "encryption-key"
    ) -> Optional[Dict[str, Any]]:
        """
        Export a key (if exportable)

        Args:
            key_name: Name of the key
            key_version: Specific version to export (optional)
            key_type: Type of key to export (encryption-key, signing-key, etc.)

        Returns:
            Key data or None if export failed
        """
        try:
            path = f"{self.mount_path}/export/{key_type}/{key_name}"

            if key_version:
                path += f"?version={key_version}"

            response = self.client.read(path)

            if not response or 'data' not in response:
                return None

            key_data = response['data']
            logger.info(f"Exported key {key_name}, version {key_version or 'latest'}")
            return key_data

        except Exception as e:
            logger.error(f"Failed to export key {key_name}: {e}")
            return None

    def import_key(
        self,
        key_name: str,
        key_data: Dict[str, Any],
        key_type: str = "encryption-key"
    ) -> bool:
        """
        Import a key

        Args:
            key_name: Name for the imported key
            key_data: Key data to import
            key_type: Type of key being imported

        Returns:
            True if import was successful
        """
        try:
            path = f"{self.mount_path}/import/{key_type}/{key_name}"
            self.client.write(path, key_data)

            logger.info(f"Imported key {key_name} of type {key_type}")
            return True

        except Exception as e:
            logger.error(f"Failed to import key {key_name}: {e}")
            return False

    def delete_key(
        self,
        key_name: str,
        force: bool = False
    ) -> bool:
        """
        Delete a key

        Args:
            key_name: Name of the key to delete
            force: Force deletion even if not allowed

        Returns:
            True if deletion was successful
        """
        try:
            if not force:
                # Check if deletion is allowed
                key_info = self.get_key_info(key_name, use_cache=False)
                if not key_info:
                    return False

                if not key_info.deletion_allowed:
                    logger.error(f"Key {key_name} deletion is not allowed")
                    return False

            # Delete the key
            path = f"{self.mount_path}/keys/{key_name}"
            self.client.delete(path)

            logger.info(f"Deleted key: {key_name}")
            return True

        except Exception as e:
            logger.error(f"Failed to delete key {key_name}: {e}")
            return False

    def get_usage_stats(self, key_name: str) -> Optional[Dict[str, Any]]:
        """
        Get usage statistics for a key

        Args:
            key_name: Name of the key

        Returns:
            Usage statistics or None if not available
        """
        try:
            path = f"{self.mount_path}/keys/{key_name}/usage"
            response = self.client.read(path)

            if not response or 'data' not in response:
                return None

            return response['data']

        except Exception as e:
            logger.error(f"Failed to get usage stats for {key_name}: {e}")
            return None

    def backup_keys(self, backup_dir: str = "vault_transit_backup") -> bool:
        """
        Backup all keys and their configurations

        Args:
            backup_dir: Directory to save backups

        Returns:
            True if backup was successful
        """
        try:
            import os
            import json
            from datetime import datetime

            # Create backup directory
            if not os.path.exists(backup_dir):
                os.makedirs(backup_dir)

            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            backup_file = os.path.join(backup_dir, f"transit_keys_backup_{timestamp}.json")

            keys = self.list_keys()
            backup_data = {
                "timestamp": timestamp,
                "vault_url": self.vault_url,
                "mount_path": self.mount_path,
                "keys": {}
            }

            for key_name in keys:
                try:
                    key_info = self.get_key_info(key_name, use_cache=False)
                    if key_info:
                        backup_data["keys"][key_name] = {
                            "type": key_info.type,
                            "version": key_info.version,
                            "latest_version": key_info.latest_version,
                            "min_decryption_version": key_info.min_decryption_version,
                            "supports_encryption": key_info.supports_encryption,
                            "supports_decryption": key_info.supports_decryption,
                            "supports_signing": key_info.supports_signing,
                            "exportable": key_info.exportable,
                            "deletion_allowed": key_info.deletion_allowed
                        }

                        # Export key if exportable
                        if key_info.exportable:
                            exported_key = self.export_key(key_name)
                            if exported_key:
                                backup_data["keys"][key_name]["exported_data"] = exported_key

                except Exception as e:
                    logger.error(f"Failed to backup key {key_name}: {e}")

            # Save backup
            with open(backup_file, 'w') as f:
                json.dump(backup_data, f, indent=2)

            logger.info(f"Backup saved to {backup_file}")
            return True

        except Exception as e:
            logger.error(f"Failed to backup keys: {e}")
            return False

    def close(self):
        """Close the Vault client connection"""
        self.client.close()
        logger.info("Vault Transit client connection closed")


class TransitEncryptor:
    """
    High-level encryption wrapper using Vault Transit
    """

    def __init__(self, transit_client: VaultTransitClient, default_key: str):
        """
        Initialize TransitEncryptor

        Args:
            transit_client: VaultTransitClient instance
            default_key: Default key name for encryption operations
        """
        self.client = transit_client
        self.default_key = default_key

    def encrypt_string(
        self,
        plaintext: str,
        key_name: Optional[str] = None,
        context: Optional[Dict[str, Any]] = None
    ) -> str:
        """
        Encrypt a string

        Args:
            plaintext: String to encrypt
            key_name: Key name to use (optional, uses default if not provided)
            context: Encryption context (optional)

        Returns:
            Base64-encoded encrypted result (JSON)
        """
        key = key_name or self.default_key
        result = self.client.encrypt(key, plaintext, context=context)

        # Return as JSON for easy storage and transport
        return base64.b64encode(json.dumps({
            'ciphertext': result.ciphertext,
            'iv': result.iv,
            'tag': result.tag,
            'key_version': result.key_version,
            'key_name': result.key_name,
            'timestamp': result.timestamp.isoformat()
        }).encode()).decode()

    def decrypt_string(
        self,
        encrypted_data: str,
        key_name: Optional[str] = None
    ) -> str:
        """
        Decrypt a string

        Args:
            encrypted_data: Base64-encoded encrypted data
            key_name: Key name to use (optional, extracted from data if not provided)

        Returns:
            Decrypted string
        """
        # Decode and parse encrypted data
        try:
            data_json = base64.b64decode(encrypted_data.encode()).decode()
            data = json.loads(data_json)

            ciphertext = data['ciphertext']
            target_key = key_name or data['key_name']
            key_version = data.get('key_version')

        except Exception as e:
            # Try legacy format (just ciphertext)
            ciphertext = encrypted_data
            target_key = key_name or self.default_key
            key_version = None

        result = self.client.decrypt(target_key, ciphertext, key_version=key_version)
        return result.plaintext

    def encrypt_file(
        self,
        file_path: str,
        key_name: Optional[str] = None,
        context: Optional[Dict[str, Any]] = None
    ) -> EncryptionResult:
        """
        Encrypt a file

        Args:
            file_path: Path to file to encrypt
            key_name: Key name to use (optional)
            context: Encryption context (optional)

        Returns:
            EncryptionResult object
        """
        with open(file_path, 'rb') as f:
            file_data = f.read()

        key = key_name or self.default_key
        return self.client.encrypt(key, file_data, context=context)

    def decrypt_file(
        self,
        encrypted_result: EncryptionResult,
        output_path: str
    ) -> bool:
        """
        Decrypt a file and write to disk

        Args:
            encrypted_result: EncryptionResult from file encryption
            output_path: Path to write decrypted file

        Returns:
            True if successful
        """
        try:
            result = self.client.decrypt(
                encrypted_result.key_name,
                encrypted_result.ciphertext,
                context=encrypted_result.context
            )

            # Write decrypted data to file
            with open(output_path, 'wb') as f:
                f.write(result.plaintext.encode('utf-8'))

            logger.info(f"File decrypted and saved to {output_path}")
            return True

        except Exception as e:
            logger.error(f"Failed to decrypt file: {e}")
            return False


# Example usage
def main():
    # Initialize Vault Transit client
    client = VaultTransitClient(
        vault_url=os.getenv("VAULT_URL", "https://vault.example.com:8200"),
        token=os.getenv("VAULT_TOKEN"),
        mount_path="transit",
        namespace="secret"
    )

    try:
        # Create a new key
        key_name = "my-app-key"
        client.create_key(
            key_name=key_name,
            key_type="aes256-gcm",
            auto_rotate_period="90d"
        )

        # Get key information
        key_info = client.get_key_info(key_name)
        print(f"Key info: {key_info}")

        # Initialize high-level encryptor
        encryptor = TransitEncryptor(client, key_name)

        # Encrypt a string
        secret_data = "This is my secret message"
        encrypted = encryptor.encrypt_string(secret_data)
        print(f"Encrypted: {encrypted[:50]}...")

        # Decrypt the string
        decrypted = encryptor.decrypt_string(encrypted)
        print(f"Decrypted: {decrypted}")

        # Encrypt with context
        context = {"user_id": "12345", "app": "myapp"}
        encrypted_with_context = client.encrypt(key_name, secret_data, context=context)
        print(f"Encrypted with context version: {encrypted_with_context.key_version}")

        # Decrypt with context
        decrypted_with_context = client.decrypt(key_name, encrypted_with_context.ciphertext, context=context)
        print(f"Decrypted with context: {decrypted_with_context.plaintext}")

        # Sign and verify data
        signature_result = client.sign_data(f"{key_name}-signing", secret_data, algorithm="sha2-256")
        print(f"Signature: {signature_result.signature[:50]}...")

        is_valid = client.verify_signature(f"{key_name}-signing", secret_data, signature_result.signature)
        print(f"Signature valid: {is_valid}")

        # Rotate key
        rotation_result = client.rotate_key(key_name)
        print(f"Key rotation: {rotation_result.message}")

        # List all keys
        keys = client.list_keys()
        print(f"Available keys: {keys}")

        # Backup keys
        client.backup_keys()

        # Clean up
        client.delete_key(key_name)

    except Exception as e:
        print(f"Error: {e}")
    finally:
        client.close()


if __name__ == "__main__":
    main()

# Example Flask integration
"""
from flask import Flask, request, jsonify
from vault_transit import VaultTransitClient, TransitEncryptor

app = Flask(__name__)

# Initialize Vault Transit client
transit_client = VaultTransitClient(
    vault_url=os.getenv("VAULT_URL"),
    token=os.getenv("VAULT_TOKEN")
)

# Initialize encryptor
encryptor = TransitEncryptor(transit_client, "app-default-key")

@app.route('/api/encrypt', methods=['POST'])
def encrypt_endpoint():
    data = request.get_json()
    plaintext = data.get('plaintext')
    key_name = data.get('key_name')
    context = data.get('context')

    if not plaintext:
        return jsonify({'error': 'plaintext is required'}), 400

    try:
        encrypted = encryptor.encrypt_string(plaintext, key_name, context)
        return jsonify({'encrypted': encrypted})
    except Exception as e:
        return jsonify({'error': str(e)}), 500

@app.route('/api/decrypt', methods=['POST'])
def decrypt_endpoint():
    data = request.get_json()
    encrypted_data = data.get('encrypted')
    key_name = data.get('key_name')

    if not encrypted_data:
        return jsonify({'error': 'encrypted data is required'}), 400

    try:
        decrypted = encryptor.decrypt_string(encrypted_data, key_name)
        return jsonify({'decrypted': decrypted})
    except Exception as e:
        return jsonify({'error': str(e)}), 500

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)
"""

export { VaultTransitClient, TransitEncryptor, EncryptionResult, DecryptionResult };