🎯 Exemples recommandés
Balanced sample collections from various categories for you to explore
Exemples HashiCorp Vault
Exemples d'implémentation HashiCorp Vault pour la gestion sécurisée des secrets, du chiffrement et du contrôle d'accès
💻 Gestion des Secrets Vault go
🟡 intermediate
⭐⭐⭐⭐
Implémentation complète de gestion des secrets Vault avec authentification, opérations CRUD et gestion des politiques
⏱️ 50 min
🏷️ vault, secrets, go, security, hashicorp
Prerequisites:
Go programming, HashiCorp Vault concepts, Secrets management, Security
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"time"
"github.com/hashicorp/vault/api"
"golang.org/x/crypto/bcrypt"
)
// VaultSecretsManager provides a high-level interface for HashiCorp Vault operations
type VaultSecretsManager struct {
client *api.Client
policies map[string]string
logger *log.Logger
}
// VaultConfig holds configuration for connecting to Vault
type VaultConfig struct {
Address string
Token string
RoleID string
SecretID string
K8SMountPath string
Namespace string
}
// SecretData represents a secret structure
type SecretData struct {
Path string `json:"path"`
Data map[string]interface{} `json:"data"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
TTL *time.Duration `json:"ttl,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
// Policy represents a Vault policy
type Policy struct {
Name string `json:"name"`
Rules string `json:"rules"`
Template string `json:"template,omitempty"`
}
// NewVaultSecretsManager creates a new Vault secrets manager
func NewVaultSecretsManager(config VaultConfig) (*VaultSecretsManager, error) {
vsm := &VaultSecretsManager{
policies: make(map[string]string),
logger: log.New(os.Stdout, "[VAULT] ", log.LstdFlags),
}
// Configure Vault client
vaultConfig := api.DefaultConfig()
vaultConfig.Address = config.Address
client, err := api.NewClient(vaultConfig)
if err != nil {
return nil, fmt.Errorf("failed to create Vault client: %w", err)
}
vsm.client = client
// Set namespace if provided
if config.Namespace != "" {
vsm.client.SetNamespace(config.Namespace)
}
// Authenticate with Vault
err = vsm.authenticate(config)
if err != nil {
return nil, fmt.Errorf("failed to authenticate with Vault: %w", err)
}
vsm.logger.Println("Successfully connected to Vault")
// Initialize default policies
err = vsm.initializeDefaultPolicies()
if err != nil {
vsm.logger.Printf("Warning: failed to initialize default policies: %v", err)
}
return vsm, nil
}
// authenticate handles different authentication methods
func (vsm *VaultSecretsManager) authenticate(config VaultConfig) error {
if config.Token != "" {
// Token authentication
vsm.client.SetToken(config.Token)
return nil
}
if config.RoleID != "" && config.SecretID != "" {
// AppRole authentication
return vsm.authenticateWithAppRole(config.RoleID, config.SecretID)
}
// Kubernetes authentication
if config.K8SMountPath != "" {
return vsm.authenticateWithK8S(config.K8SMountPath)
}
return fmt.Errorf("no valid authentication method provided")
}
// authenticateWithAppRole authenticates using AppRole
func (vsm *VaultSecretsManager) authenticateWithAppRole(roleID, secretID string) error {
secret, err := vsm.client.Logical().Write("auth/approle/login", map[string]interface{}{
"role_id": roleID,
"secret_id": secretID,
})
if err != nil {
return fmt.Errorf("AppRole authentication failed: %w", err)
}
if secret == nil || secret.Auth == nil || secret.Auth.ClientToken == "" {
return fmt.Errorf("no token received from AppRole authentication")
}
vsm.client.SetToken(secret.Auth.ClientToken)
vsm.logger.Printf("Authenticated with AppRole, token expires: %v", secret.Auth.LeaseDuration)
return nil
}
// authenticateWithK8S authenticates using Kubernetes
func (vsm *VaultSecretsManager) authenticateWithK8S(mountPath string) error {
// Read JWT token from service account
jwt, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/token")
if err != nil {
return fmt.Errorf("failed to read Kubernetes service account token: %w", err)
}
// Read the current pod name and namespace
podName, err := os.Hostname()
if err != nil {
return fmt.Errorf("failed to get pod name: %w", err)
}
namespace, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace")
if err != nil {
return fmt.Errorf("failed to read namespace: %w", err)
}
// Authenticate with Vault
secret, err := vsm.client.Logical().Write(fmt.Sprintf("auth/%s/login", mountPath), map[string]interface{}{
"jwt": string(jwt),
"role": "default", // This should be configurable
})
if err != nil {
return fmt.Errorf("Kubernetes authentication failed: %w", err)
}
if secret == nil || secret.Auth == nil || secret.Auth.ClientToken == "" {
return fmt.Errorf("no token received from Kubernetes authentication")
}
vsm.client.SetToken(secret.Auth.ClientToken)
vsm.logger.Printf("Authenticated with Kubernetes, token expires: %v", secret.Auth.LeaseDuration)
return nil
}
// initializeDefaultPolicies creates standard policies
func (vsm *VaultSecretsManager) initializeDefaultPolicies() error {
policies := map[string]string{
"admin": `
path "sys/*" {
capabilities = ["create", "read", "update", "delete", "list", "sudo"]
}
path "auth/*" {
capabilities = ["create", "read", "update", "delete", "list"]
}
path "secret/*" {
capabilities = ["create", "read", "update", "delete", "list"]
}
path "pki/*" {
capabilities = ["create", "read", "update", "delete", "list"]
}
path "kv/*" {
capabilities = ["create", "read", "update", "delete", "list"]
}
`,
"readonly": `
path "secret/*" {
capabilities = ["read", "list"]
}
path "kv/*" {
capabilities = ["read", "list"]
}
path "pki/issue/*" {
capabilities = ["update"]
}
path "pki/cert/*" {
capabilities = ["read", "list"]
}
`,
"app-developer": `
path "kv/data/apps/*" {
capabilities = ["create", "read", "update", "delete", "list"]
}
path "kv/metadata/apps/*" {
capabilities = ["read", "update", "delete", "list"]
}
path "pki/issue/*" {
capabilities = ["update"]
}
path "transit/encrypt/*" {
capabilities = ["update"]
}
path "transit/decrypt/*" {
capabilities = ["update"]
}
`,
"app-readonly": `
path "kv/data/apps/*" {
capabilities = ["read", "list"]
}
path "kv/metadata/apps/*" {
capabilities = ["read", "list"]
}
path "pki/issue/*" {
capabilities = ["deny"]
}
path "transit/encrypt/*" {
capabilities = ["deny"]
}
path "transit/decrypt/*" {
capabilities = ["deny"]
}
`,
}
for name, rules := range policies {
err := vsm.createPolicy(name, rules)
if err != nil {
vsm.logger.Printf("Failed to create policy %s: %v", name, err)
} else {
vsm.logger.Printf("Created policy: %s", name)
}
}
return nil
}
// createPolicy creates or updates a Vault policy
func (vsm *VaultSecretsManager) createPolicy(name, rules string) error {
_, err := vsm.client.Logical().Write(fmt.Sprintf("sys/policies/acl/%s", name), map[string]interface{}{
"policy": rules,
})
if err != nil {
return fmt.Errorf("failed to create policy %s: %w", name, err)
}
vsm.policies[name] = rules
return nil
}
// ReadPolicy reads a Vault policy
func (vsm *VaultSecretsManager) readPolicy(name string) (string, error) {
secret, err := vsm.client.Logical().Read(fmt.Sprintf("sys/policies/acl/%s", name))
if err != nil {
return "", fmt.Errorf("failed to read policy %s: %w", name, err)
}
if secret == nil || secret.Data == nil {
return "", fmt.Errorf("policy %s not found", name)
}
rules, ok := secret.Data["rules"].(string)
if !ok {
return "", fmt.Errorf("invalid policy data format")
}
return rules, nil
}
// ListPolicies lists all available policies
func (vsm *VaultSecretsManager) ListPolicies() ([]string, error) {
secret, err := vsm.client.Logical().Read("sys/policies/acl")
if err != nil {
return nil, fmt.Errorf("failed to list policies: %w", err)
}
if secret == nil || secret.Data == nil {
return []string{}, nil
}
policiesData, ok := secret.Data["policies"].([]interface{})
if !ok {
return nil, fmt.Errorf("invalid policies data format")
}
var policies []string
for _, policy := range policiesData {
if policyName, ok := policy.(string); ok {
policies = append(policies, policyName)
}
}
return policies, nil
}
// WriteSecret writes a secret to Vault KV v2
func (vsm *VaultSecretsManager) WriteSecret(path string, data map[string]interface{}, ttl *time.Duration) error {
// Prepare secret data
secretData := map[string]interface{}{
"data": data,
}
// Add TTL if specified
if ttl != nil {
secretData["options"] = map[string]interface{}{
"ttl": ttl.String(),
}
}
// Write secret to KV v2
_, err := vsm.client.Logical().Write(fmt.Sprintf("kv/data/%s", path), secretData)
if err != nil {
return fmt.Errorf("failed to write secret %s: %w", path, err)
}
vsm.logger.Printf("Secret written: %s", path)
return nil
}
// ReadSecret reads a secret from Vault KV v2
func (vsm *VaultSecretsManager) ReadSecret(path string) (map[string]interface{}, error) {
secret, err := vsm.client.Logical().Read(fmt.Sprintf("kv/data/%s", path))
if err != nil {
return nil, fmt.Errorf("failed to read secret %s: %w", path, err)
}
if secret == nil || secret.Data == nil {
return nil, fmt.Errorf("secret %s not found", path)
}
// Extract data from KV v2 response
data, ok := secret.Data["data"].(map[string]interface{})
if !ok {
return nil, fmt.Errorf("invalid secret data format")
}
// Add metadata if available
if metadata, exists := secret.Data["metadata"].(map[string]interface{}); exists {
data["_metadata"] = metadata
}
return data, nil
}
// ReadSecretWithVersion reads a specific version of a secret
func (vsm *VaultSecretsManager) ReadSecretWithVersion(path string, version int) (map[string]interface{}, error) {
secret, err := vsm.client.Logical().ReadWithData(fmt.Sprintf("kv/data/%s", path), map[string][]string{
"version": {fmt.Sprintf("%d", version)},
})
if err != nil {
return nil, fmt.Errorf("failed to read secret %s version %d: %w", path, version, err)
}
if secret == nil || secret.Data == nil {
return nil, fmt.Errorf("secret %s version %d not found", path, version)
}
data, ok := secret.Data["data"].(map[string]interface{})
if !ok {
return nil, fmt.Errorf("invalid secret data format")
}
// Add version metadata
if metadata, exists := secret.Data["metadata"].(map[string]interface{}); exists {
data["_metadata"] = metadata
data["_metadata"]["requested_version"] = version
}
return data, nil
}
// ListSecrets lists secrets at a given path
func (vsm *VaultSecretsManager) ListSecrets(path string) ([]string, error) {
secret, err := vsm.client.Logical().List(fmt.Sprintf("kv/metadata/%s", path))
if err != nil {
return nil, fmt.Errorf("failed to list secrets %s: %w", path, err)
}
if secret == nil || secret.Data == nil {
return []string{}, nil
}
keys, ok := secret.Data["keys"].([]interface{})
if !ok {
return nil, fmt.Errorf("invalid list data format")
}
var result []string
for _, key := range keys {
if keyStr, ok := key.(string); ok {
result = append(result, keyStr)
}
}
return result, nil
}
// DeleteSecret deletes a secret from Vault KV v2
func (vsm *VaultSecretsManager) DeleteSecret(path string) error {
// Delete latest version (soft delete)
_, err := vsm.client.Logical().Delete(fmt.Sprintf("kv/data/%s", path))
if err != nil {
return fmt.Errorf("failed to delete secret %s: %w", path, err)
}
// Permanent delete all versions
_, err = vsm.client.Logical().Delete(fmt.Sprintf("kv/destroy/%s", path))
if err != nil {
vsm.logger.Printf("Warning: failed to permanently destroy secret %s: %v", path, err)
}
vsm.logger.Printf("Secret deleted: %s", path)
return nil
}
// DeleteSecretVersion deletes a specific version of a secret
func (vsm *VaultSecretsManager) DeleteSecretVersion(path string, versions []int) error {
versionsData := make([]string, len(versions))
for i, v := range versions {
versionsData[i] = fmt.Sprintf("%d", v)
}
_, err := vsm.client.Logical().Write(fmt.Sprintf("kv/destroy/%s", path), map[string]interface{}{
"versions": versionsData,
})
if err != nil {
return fmt.Errorf("failed to destroy secret versions for %s: %w", path, err)
}
vsm.logger.Printf("Secret versions destroyed: %s, versions: %v", path, versions)
return nil
}
// UndeleteSecretVersions undeletes specific versions of a secret
func (vsm *VaultSecretsManager) UndeleteSecretVersions(path string, versions []int) error {
versionsData := make([]string, len(versions))
for i, v := range versions {
versionsData[i] = fmt.Sprintf("%d", v)
}
_, err := vsm.client.Logical().Write(fmt.Sprintf("kv/undelete/%s", path), map[string]interface{}{
"versions": versionsData,
})
if err != nil {
return fmt.Errorf("failed to undelete secret versions for %s: %w", path, err)
}
vsm.logger.Printf("Secret versions undeleted: %s, versions: %v", path, versions)
return nil
}
// GetSecretMetadata retrieves metadata for a secret
func (vsm *VaultSecretsManager) GetSecretMetadata(path string) (map[string]interface{}, error) {
secret, err := vsm.client.Logical().Read(fmt.Sprintf("kv/metadata/%s", path))
if err != nil {
return nil, fmt.Errorf("failed to read metadata for %s: %w", path, err)
}
if secret == nil || secret.Data == nil {
return nil, fmt.Errorf("metadata for %s not found", path)
}
return secret.Data, nil
}
// UpdateSecretMetadata updates metadata for a secret
func (vsm *VaultSecretsManager) UpdateSecretMetadata(path string, metadata map[string]interface{}) error {
_, err := vsm.client.Logical().Write(fmt.Sprintf("kv/metadata/%s", path), metadata)
if err != nil {
return fmt.Errorf("failed to update metadata for %s: %w", path, err)
}
vsm.logger.Printf("Metadata updated for: %s", path)
return nil
}
// CreateToken creates a new Vault token with specific policies and TTL
func (vsm *VaultSecretsManager) CreateToken(policies []string, ttl time.Duration, renewable bool) (string, error) {
tokenRequest := &api.TokenCreateRequest{
Policies: policies,
TTL: ttl.String(),
Renewable: &renewable,
}
secret, err := vsm.client.Auth().Token().Create(tokenRequest)
if err != nil {
return "", fmt.Errorf("failed to create token: %w", err)
}
if secret == nil || secret.Auth == nil || secret.Auth.ClientToken == "" {
return "", fmt.Errorf("no token received")
}
vsm.logger.Printf("Token created with policies: %v, TTL: %v", policies, ttl)
return secret.Auth.ClientToken, nil
}
// RenewToken renews a Vault token
func (vsm *VaultSecretsManager) RenewToken(token string, ttl time.Duration) error {
// Create a temporary client with the token to renew
tempClient := vsm.client.Clone()
tempClient.SetToken(token)
secret, err := tempClient.Auth().Token().RenewSelf(ttl)
if err != nil {
return fmt.Errorf("failed to renew token: %w", err)
}
if secret == nil || secret.Auth == nil {
return fmt.Errorf("no token data received during renewal")
}
vsm.logger.Printf("Token renewed, new TTL: %v", secret.Auth.LeaseDuration)
return nil
}
// RevokeToken revokes a Vault token
func (vsm *VaultSecretsManager) RevokeToken(token string) error {
err := vsm.client.Auth().Token().Revoke(token)
if err != nil {
return fmt.Errorf("failed to revoke token: %w", err)
}
vsm.logger.Println("Token revoked")
return nil
}
// CreateAppRole creates a new AppRole
func (vsm *VaultSecretsManager) CreateAppRole(name string, policies []string, ttl time.Duration) error {
appRoleData := map[string]interface{}{
"policies": policies,
"ttl": ttl.String(),
"period": ttl.String(),
"bind_secret_id": true,
}
_, err := vsm.client.Logical().Write(fmt.Sprintf("auth/approle/role/%s", name), appRoleData)
if err != nil {
return fmt.Errorf("failed to create AppRole %s: %w", name, err)
}
vsm.logger.Printf("AppRole created: %s", name)
return nil
}
// GetAppRoleSecretID generates a new SecretID for an AppRole
func (vsm *VaultSecretsManager) GetAppRoleSecretID(appRole string) (string, error) {
secret, err := vsm.client.Logical().Write(fmt.Sprintf("auth/approle/role/%s/secret-id", appRole), nil)
if err != nil {
return "", fmt.Errorf("failed to generate SecretID for AppRole %s: %w", appRole, err)
}
if secret == nil || secret.Data == nil {
return "", fmt.Errorf("no SecretID data received")
}
secretID, ok := secret.Data["secret_id"].(string)
if !ok {
return "", fmt.Errorf("invalid SecretID data format")
}
vsm.logger.Printf("SecretID generated for AppRole: %s", appRole)
return secretID, nil
}
// GetAppRoleRoleID retrieves the RoleID for an AppRole
func (vsm *VaultSecretsManager) GetAppRoleRoleID(appRole string) (string, error) {
secret, err := vsm.client.Logical().Read(fmt.Sprintf("auth/approle/role/%s/role-id", appRole))
if err != nil {
return "", fmt.Errorf("failed to get RoleID for AppRole %s: %w", appRole, err)
}
if secret == nil || secret.Data == nil {
return "", fmt.Errorf("no RoleID data received")
}
roleID, ok := secret.Data["role_id"].(string)
if !ok {
return "", fmt.Errorf("invalid RoleID data format")
}
return roleID, nil
}
// Health performs a health check on Vault
func (vsm *VaultSecretsManager) Health() (*api.HealthResponse, error) {
return vsm.client.Sys().Health()
}
// SealStatus checks if Vault is sealed
func (vsm *VaultSecretsManager) SealStatus() (*api.SealStatusResponse, error) {
return vsm.client.Sys().SealStatus()
}
// BackupSecrets exports all secrets (for backup purposes)
func (vsm *VaultSecretsManager) BackupSecrets() (map[string]interface{}, error) {
backup := make(map[string]interface{})
backup["timestamp"] = time.Now().UTC()
backup["secrets"] = make(map[string]interface{})
// List all top-level paths
topLevelPaths := []string{"", "apps/", "infrastructure/", "users/"}
for _, basePath := range topLevelPaths {
err := vsm.backupSecretPath(basePath, backup["secrets"].(map[string]interface{}))
if err != nil {
vsm.logger.Printf("Warning: failed to backup path %s: %v", basePath, err)
}
}
return backup, nil
}
// backupSecretPath recursively backs up secrets from a path
func (vsm *VaultSecretsManager) backupSecretPath(path string, backup map[string]interface{}) error {
secrets, err := vsm.ListSecrets(path)
if err != nil {
return err
}
for _, secretPath := range secrets {
fullPath := path + secretPath
// Check if it's a subfolder (ends with /)
if len(secretPath) > 0 && secretPath[len(secretPath)-1] == '/' {
// It's a folder, recurse
subBackup := make(map[string]interface{})
err := vsm.backupSecretPath(fullPath, subBackup)
if err != nil {
return err
}
backup[secretPath] = subBackup
} else {
// It's a secret, read and store
secretData, err := vsm.ReadSecret(fullPath)
if err != nil {
return err
}
backup[secretPath] = secretData
}
}
return nil
}
// Close cleans up resources
func (vsm *VaultSecretsManager) Close() {
vsm.logger.Println("Vault secrets manager closed")
}
// Main function demonstrating usage
func main() {
// Configuration
config := VaultConfig{
Address: os.Getenv("VAULT_ADDR"),
RoleID: os.Getenv("VAULT_ROLE_ID"),
SecretID: os.Getenv("VAULT_SECRET_ID"),
K8SMountPath: "kubernetes",
Namespace: "admin",
}
// Create secrets manager
vsm, err := NewVaultSecretsManager(config)
if err != nil {
log.Fatalf("Failed to create Vault secrets manager: %v", err)
}
defer vsm.Close()
// Health check
health, err := vsm.Health()
if err != nil {
log.Printf("Health check failed: %v", err)
} else {
log.Printf("Vault health: %v, version: %s", health.Initialized, health.Version)
}
// Example: Write a secret
appSecret := map[string]interface{}{
"database_url": "postgresql://localhost:5432/myapp",
"api_key": "sk-1234567890abcdef",
"debug_mode": true,
}
err = vsm.WriteSecret("apps/myapp/config", appSecret, &time.Hour*24*7) // 7 days TTL
if err != nil {
log.Printf("Failed to write secret: %v", err)
}
// Example: Read a secret
secretData, err := vsm.ReadSecret("apps/myapp/config")
if err != nil {
log.Printf("Failed to read secret: %v", err)
} else {
log.Printf("Secret data: %+v", secretData)
}
// Example: Create a policy
policy := `
path "kv/data/apps/myapp/*" {
capabilities = ["create", "read", "update", "delete"]
}
`
err = vsm.createPolicy("myapp-policy", policy)
if err != nil {
log.Printf("Failed to create policy: %v", err)
}
// Example: Create an AppRole
err = vsm.CreateAppRole("myapp-approle", []string{"myapp-policy"}, time.Hour*24)
if err != nil {
log.Printf("Failed to create AppRole: %v", err)
}
// Example: Get RoleID and SecretID
roleID, err := vsm.GetAppRoleRoleID("myapp-approle")
if err != nil {
log.Printf("Failed to get RoleID: %v", err)
} else {
log.Printf("AppRole RoleID: %s", roleID)
}
secretID, err := vsm.GetAppRoleSecretID("myapp-approle")
if err != nil {
log.Printf("Failed to get SecretID: %v", err)
} else {
log.Printf("AppRole SecretID: %s", secretID)
}
// Example: Create a token
token, err := vsm.CreateToken([]string{"myapp-policy"}, time.Hour*8, true)
if err != nil {
log.Printf("Failed to create token: %v", err)
} else {
log.Printf("Created token: %s", token[:20]+"...")
}
}
// Example HTTP handler for using with web frameworks
type VaultHandler struct {
vsm *VaultSecretsManager
}
func NewVaultHandler(config VaultConfig) (*VaultHandler, error) {
vsm, err := NewVaultSecretsManager(config)
if err != nil {
return nil, err
}
return &VaultHandler{vsm: vsm}, nil
}
func (vh *VaultHandler) GetSecretHandler(w http.ResponseWriter, r *http.Request) {
path := r.URL.Query().Get("path")
if path == "" {
http.Error(w, "path parameter is required", http.StatusBadRequest)
return
}
secret, err := vh.vsm.ReadSecret(path)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(secret)
}
func (vh *VaultHandler) WriteSecretHandler(w http.ResponseWriter, r *http.Request) {
var req struct {
Path string `json:"path"`
Data map[string]interface{} `json:"data"`
TTL string `json:"ttl,omitempty"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
var ttl *time.Duration
if req.TTL != "" {
parsedTTL, err := time.ParseDuration(req.TTL)
if err != nil {
http.Error(w, "invalid TTL format", http.StatusBadRequest)
return
}
ttl = &parsedTTL
}
err := vh.vsm.WriteSecret(req.Path, req.Data, ttl)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusCreated)
json.NewEncoder(w).Encode(map[string]string{"status": "success"})
}
// Export
export { VaultSecretsManager, VaultConfig, SecretData, Policy };
💻 Moteur de Transit Vault python
🔴 complex
⭐⭐⭐⭐⭐
Implémentation de chiffrement en service utilisant le moteur de Transit de Vault avec rotation des clés et chiffrement des données
⏱️ 60 min
🏷️ vault, transit, encryption, python, security
Prerequisites:
Python, HashiCorp Vault, Cryptography, Security concepts
#!/usr/bin/env python3
"""
Vault Transit Secrets Engine Implementation
Encryption as a Service with key rotation and secure cryptographic operations
"""
import base64
import json
import logging
import os
import secrets
import time
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple, Any, Union
from datetime import datetime, timedelta
import hvac
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding, rsa
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.backends import default_backend
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class EncryptionKey:
"""Encryption key information"""
name: str
type: str # aes256-gcm, chacha20-poly1305, ed25519, rsa-2048, etc.
version: int
created_at: datetime
latest_version: int
min_decryption_version: int
deletion_allowed: bool
exportable: bool
supports_encryption: bool
supports_decryption: bool
supports_signing: bool
supports_derivation: bool
@dataclass
class EncryptionResult:
"""Result of encryption operation"""
ciphertext: str
iv: Optional[str]
tag: Optional[str]
key_version: int
key_name: str
encryption_type: str
timestamp: datetime
context: Optional[Dict[str, Any]] = None
@dataclass
class DecryptionResult:
"""Result of decryption operation"""
plaintext: str
key_name: str
key_version: int
encryption_type: str
timestamp: datetime
@dataclass
class SignatureResult:
"""Result of signing operation"""
signature: str
key_name: str
key_version: int
algorithm: str
timestamp: datetime
@dataclass
class KeyRotationResult:
"""Result of key rotation"""
previous_version: int
new_version: int
rotated_at: datetime
success: bool
message: str
class VaultTransitClient:
"""
Vault Transit Secrets Engine Client
Provides encryption as a service with key management
"""
def __init__(
self,
vault_url: str,
token: str,
mount_path: str = "transit",
namespace: Optional[str] = None
):
"""
Initialize Vault Transit client
Args:
vault_url: Vault server URL
token: Vault authentication token
mount_path: Transit engine mount path
namespace: Vault namespace (optional)
"""
self.vault_url = vault_url
self.token = token
self.mount_path = mount_path.rstrip('/')
self.namespace = namespace
# Initialize Vault client
self.client = hvac.Client(
url=vault_url,
token=token,
namespace=namespace
)
# Verify connection
self._verify_connection()
# Cache for key information
self._key_cache: Dict[str, EncryptionKey] = {}
self._cache_ttl = 300 # 5 minutes
self._last_cache_update = {}
logger.info(f"Vault Transit client initialized: {vault_url}")
def _verify_connection(self):
"""Verify connection to Vault"""
try:
health = self.client.sys.read_health_status()
if not health['initialized']:
raise Exception("Vault is not initialized")
logger.info("Vault connection verified")
except Exception as e:
logger.error(f"Failed to connect to Vault: {e}")
raise
def create_key(
self,
key_name: str,
key_type: str = "aes256-gcm",
exportable: bool = False,
allow_plaintext_backup: bool = False,
auto_rotate_period: Optional[str] = None,
convergent_encryption: bool = False
) -> bool:
"""
Create a new encryption key
Args:
key_name: Name of the key
key_type: Type of key (aes256-gcm, chacha20-poly1305, ed25519, rsa-2048, etc.)
exportable: Whether the key can be exported
allow_plaintext_backup: Whether plaintext backup is allowed
auto_rotate_period: Auto-rotation period (e.g., "30d", "1h")
convergent_encryption: Whether to use convergent encryption
Returns:
True if key was created successfully
"""
try:
config = {
"type": key_type,
"exportable": exportable,
"allow_plaintext_backup": allow_plaintext_backup,
"convergent_encryption": convergent_encryption
}
if auto_rotate_period:
config["auto_rotate_period"] = auto_rotate_period
path = f"{self.mount_path}/keys/{key_name}"
self.client.write(path, config)
logger.info(f"Created key: {key_name} (type: {key_type})")
# Clear cache for this key
self._key_cache.pop(key_name, None)
return True
except Exception as e:
logger.error(f"Failed to create key {key_name}: {e}")
return False
def get_key_info(self, key_name: str, use_cache: bool = True) -> Optional[EncryptionKey]:
"""
Get information about an encryption key
Args:
key_name: Name of the key
use_cache: Whether to use cached information
Returns:
EncryptionKey object or None if key not found
"""
# Check cache first
if (use_cache and key_name in self._key_cache and
time.time() - self._last_cache_update.get(key_name, 0) < self._cache_ttl):
return self._key_cache[key_name]
try:
path = f"{self.mount_path}/keys/{key_name}"
response = self.client.read(path)
if not response or 'data' not in response:
return None
data = response['data']
key_info = EncryptionKey(
name=key_name,
type=data['type'],
version=data['latest_version'],
created_at=datetime.fromisoformat(data['creation_time'].replace('Z', '+00:00')),
latest_version=data['latest_version'],
min_decryption_version=data['min_decryption_version'],
deletion_allowed=data.get('deletion_allowed', False),
exportable=data.get('exportable', False),
supports_encryption=data.get('supports_encryption', False),
supports_decryption=data.get('supports_decryption', False),
supports_signing=data.get('supports_signing', False),
supports_derivation=data.get('supports_derivation', False)
)
# Update cache
self._key_cache[key_name] = key_info
self._last_cache_update[key_name] = time.time()
return key_info
except Exception as e:
logger.error(f"Failed to get key info for {key_name}: {e}")
return None
def list_keys(self) -> List[str]:
"""List all available keys"""
try:
path = f"{self.mount_path}/keys"
response = self.client.list(path)
if not response or 'data' not in response:
return []
keys = response['data'].get('keys', [])
return keys
except Exception as e:
logger.error(f"Failed to list keys: {e}")
return []
def encrypt(
self,
key_name: str,
plaintext: Union[str, bytes],
context: Optional[Dict[str, Any]] = None,
key_version: Optional[int] = None,
nonce: Optional[bytes] = None
) -> EncryptionResult:
"""
Encrypt data using Vault Transit engine
Args:
key_name: Name of the encryption key
plaintext: Data to encrypt (string or bytes)
context: Additional context for convergent encryption
key_version: Specific key version to use (optional)
nonce: Custom nonce for deterministic encryption (optional)
Returns:
EncryptionResult object
"""
try:
# Convert plaintext to base64 if it's bytes
if isinstance(plaintext, bytes):
plaintext_b64 = base64.b64encode(plaintext).decode('utf-8')
else:
plaintext_b64 = base64.b64encode(plaintext.encode('utf-8')).decode('utf-8')
# Prepare encryption request
encrypt_data = {"plaintext": plaintext_b64}
if context:
# Convert context to base64
context_json = json.dumps(context, separators=(',', ':'))
context_b64 = base64.b64encode(context_json.encode('utf-8')).decode('utf-8')
encrypt_data["context"] = context_b64
if key_version:
encrypt_data["key_version"] = key_version
if nonce:
encrypt_data["nonce"] = base64.b64encode(nonce).decode('utf-8')
path = f"{self.mount_path}/encrypt/{key_name}"
response = self.client.write(path, encrypt_data)
if not response or 'data' not in response:
raise Exception("No response from Vault")
data = response['data']
result = EncryptionResult(
ciphertext=data['ciphertext'],
iv=data.get('iv'),
tag=data.get('tag'),
key_version=data['key_version'],
key_name=key_name,
encryption_type="vault-transit",
timestamp=datetime.utcnow(),
context=context
)
logger.info(f"Encrypted data with key {key_name}, version {result.key_version}")
return result
except Exception as e:
logger.error(f"Failed to encrypt data with key {key_name}: {e}")
raise
def decrypt(
self,
key_name: str,
ciphertext: str,
context: Optional[Dict[str, Any]] = None,
key_version: Optional[int] = None
) -> DecryptionResult:
"""
Decrypt data using Vault Transit engine
Args:
key_name: Name of the decryption key
ciphertext: Ciphertext to decrypt
context: Context used during encryption (if applicable)
key_version: Expected key version (optional)
Returns:
DecryptionResult object
"""
try:
# Prepare decryption request
decrypt_data = {"ciphertext": ciphertext}
if context:
# Convert context to base64
context_json = json.dumps(context, separators=(',', ':'))
context_b64 = base64.b64encode(context_json.encode('utf-8')).decode('utf-8')
decrypt_data["context"] = context_b64
if key_version:
decrypt_data["key_version"] = key_version
path = f"{self.mount_path}/decrypt/{key_name}"
response = self.client.write(path, decrypt_data)
if not response or 'data' not in response:
raise Exception("No response from Vault")
data = response['data']
plaintext_b64 = data['plaintext']
# Decode base64 and return as string
plaintext_bytes = base64.b64decode(plaintext_b64)
plaintext = plaintext_bytes.decode('utf-8')
result = DecryptionResult(
plaintext=plaintext,
key_name=key_name,
key_version=data.get('key_version', 0),
encryption_type="vault-transit",
timestamp=datetime.utcnow()
)
logger.info(f"Decrypted data with key {key_name}, version {result.key_version}")
return result
except Exception as e:
logger.error(f"Failed to decrypt data with key {key_name}: {e}")
raise
def sign_data(
self,
key_name: str,
input_data: Union[str, bytes],
algorithm: str = "sha2-256",
context: Optional[Dict[str, Any]] = None,
key_version: Optional[int] = None
) -> SignatureResult:
"""
Sign data using Vault Transit engine
Args:
key_name: Name of the signing key
input_data: Data to sign (string or bytes)
algorithm: Hash algorithm to use
context: Additional context
key_version: Specific key version to use (optional)
Returns:
SignatureResult object
"""
try:
# Convert input data to base64
if isinstance(input_data, bytes):
input_b64 = base64.b64encode(input_data).decode('utf-8')
else:
input_b64 = base64.b64encode(input_data.encode('utf-8')).decode('utf-8')
# Prepare signing request
sign_data = {
"input": input_b64,
"algorithm": algorithm
}
if context:
context_json = json.dumps(context, separators=(',', ':'))
context_b64 = base64.b64encode(context_json.encode('utf-8')).decode('utf-8')
sign_data["context"] = context_b64
if key_version:
sign_data["key_version"] = key_version
path = f"{self.mount_path}/sign/{key_name}/{algorithm}"
response = self.client.write(path, sign_data)
if not response or 'data' not in response:
raise Exception("No response from Vault")
data = response['data']
result = SignatureResult(
signature=data['signature'],
key_name=key_name,
key_version=data.get('key_version', 0),
algorithm=algorithm,
timestamp=datetime.utcnow()
)
logger.info(f"Signed data with key {key_name}, algorithm {algorithm}")
return result
except Exception as e:
logger.error(f"Failed to sign data with key {key_name}: {e}")
raise
def verify_signature(
self,
key_name: str,
input_data: Union[str, bytes],
signature: str,
algorithm: str = "sha2-256",
context: Optional[Dict[str, Any]] = None,
hmac: bool = False
) -> bool:
"""
Verify a signature using Vault Transit engine
Args:
key_name: Name of the verification key
input_data: Original data that was signed
signature: Signature to verify
algorithm: Hash algorithm that was used
context: Context used during signing (if applicable)
hmac: Whether to use HMAC verification
Returns:
True if signature is valid, False otherwise
"""
try:
# Convert input data to base64
if isinstance(input_data, bytes):
input_b64 = base64.b64encode(input_data).decode('utf-8')
else:
input_b64 = base64.b64encode(input_data.encode('utf-8')).decode('utf-8')
# Prepare verification request
verify_data = {
"input": input_b64,
"algorithm": algorithm,
"signature": signature,
"hmac": hmac
}
if context:
context_json = json.dumps(context, separators=(',', ':'))
context_b64 = base64.b64encode(context_json.encode('utf-8')).decode('utf-8')
verify_data["context"] = context_b64
path = f"{self.mount_path}/verify/{key_name}/{algorithm}"
response = self.client.write(path, verify_data)
if not response or 'data' not in response:
raise Exception("No response from Vault")
data = response['data']
is_valid = data.get('valid', False)
logger.info(f"Signature verification for key {key_name}: {'valid' if is_valid else 'invalid'}")
return is_valid
except Exception as e:
logger.error(f"Failed to verify signature with key {key_name}: {e}")
return False
def rotate_key(self, key_name: str) -> KeyRotationResult:
"""
Rotate an encryption key
Args:
key_name: Name of the key to rotate
Returns:
KeyRotationResult object
"""
try:
# Get current key info before rotation
current_key_info = self.get_key_info(key_name, use_cache=False)
if not current_key_info:
raise Exception(f"Key {key_name} not found")
previous_version = current_key_info.latest_version
# Rotate key
path = f"{self.mount_path}/keys/{key_name}/rotate"
response = self.client.write(path, {})
if not response:
raise Exception("No response from Vault during rotation")
# Get updated key info
updated_key_info = self.get_key_info(key_name, use_cache=False)
if not updated_key_info:
raise Exception("Failed to get updated key info")
result = KeyRotationResult(
previous_version=previous_version,
new_version=updated_key_info.latest_version,
rotated_at=datetime.utcnow(),
success=True,
message=f"Key rotated from version {previous_version} to {updated_key_info.latest_version}"
)
logger.info(result.message)
return result
except Exception as e:
error_msg = f"Failed to rotate key {key_name}: {e}"
logger.error(error_msg)
return KeyRotationResult(
previous_version=0,
new_version=0,
rotated_at=datetime.utcnow(),
success=False,
message=error_msg
)
def configure_key_rotation(
self,
key_name: str,
auto_rotate_period: str,
min_available_version: Optional[int] = None
) -> bool:
"""
Configure automatic key rotation
Args:
key_name: Name of the key
auto_rotate_period: Rotation period (e.g., "30d", "1h")
min_available_version: Minimum number of versions to keep
Returns:
True if configuration was successful
"""
try:
config = {
"auto_rotate_period": auto_rotate_period
}
if min_available_version:
config["min_available_version"] = min_available_version
path = f"{self.mount_path}/keys/{key_name}/config"
self.client.write(path, config)
logger.info(f"Configured key rotation for {key_name}: {auto_rotate_period}")
return True
except Exception as e:
logger.error(f"Failed to configure key rotation for {key_name}: {e}")
return False
def export_key(
self,
key_name: str,
key_version: Optional[int] = None,
key_type: str = "encryption-key"
) -> Optional[Dict[str, Any]]:
"""
Export a key (if exportable)
Args:
key_name: Name of the key
key_version: Specific version to export (optional)
key_type: Type of key to export (encryption-key, signing-key, etc.)
Returns:
Key data or None if export failed
"""
try:
path = f"{self.mount_path}/export/{key_type}/{key_name}"
if key_version:
path += f"?version={key_version}"
response = self.client.read(path)
if not response or 'data' not in response:
return None
key_data = response['data']
logger.info(f"Exported key {key_name}, version {key_version or 'latest'}")
return key_data
except Exception as e:
logger.error(f"Failed to export key {key_name}: {e}")
return None
def import_key(
self,
key_name: str,
key_data: Dict[str, Any],
key_type: str = "encryption-key"
) -> bool:
"""
Import a key
Args:
key_name: Name for the imported key
key_data: Key data to import
key_type: Type of key being imported
Returns:
True if import was successful
"""
try:
path = f"{self.mount_path}/import/{key_type}/{key_name}"
self.client.write(path, key_data)
logger.info(f"Imported key {key_name} of type {key_type}")
return True
except Exception as e:
logger.error(f"Failed to import key {key_name}: {e}")
return False
def delete_key(
self,
key_name: str,
force: bool = False
) -> bool:
"""
Delete a key
Args:
key_name: Name of the key to delete
force: Force deletion even if not allowed
Returns:
True if deletion was successful
"""
try:
if not force:
# Check if deletion is allowed
key_info = self.get_key_info(key_name, use_cache=False)
if not key_info:
return False
if not key_info.deletion_allowed:
logger.error(f"Key {key_name} deletion is not allowed")
return False
# Delete the key
path = f"{self.mount_path}/keys/{key_name}"
self.client.delete(path)
logger.info(f"Deleted key: {key_name}")
return True
except Exception as e:
logger.error(f"Failed to delete key {key_name}: {e}")
return False
def get_usage_stats(self, key_name: str) -> Optional[Dict[str, Any]]:
"""
Get usage statistics for a key
Args:
key_name: Name of the key
Returns:
Usage statistics or None if not available
"""
try:
path = f"{self.mount_path}/keys/{key_name}/usage"
response = self.client.read(path)
if not response or 'data' not in response:
return None
return response['data']
except Exception as e:
logger.error(f"Failed to get usage stats for {key_name}: {e}")
return None
def backup_keys(self, backup_dir: str = "vault_transit_backup") -> bool:
"""
Backup all keys and their configurations
Args:
backup_dir: Directory to save backups
Returns:
True if backup was successful
"""
try:
import os
import json
from datetime import datetime
# Create backup directory
if not os.path.exists(backup_dir):
os.makedirs(backup_dir)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_file = os.path.join(backup_dir, f"transit_keys_backup_{timestamp}.json")
keys = self.list_keys()
backup_data = {
"timestamp": timestamp,
"vault_url": self.vault_url,
"mount_path": self.mount_path,
"keys": {}
}
for key_name in keys:
try:
key_info = self.get_key_info(key_name, use_cache=False)
if key_info:
backup_data["keys"][key_name] = {
"type": key_info.type,
"version": key_info.version,
"latest_version": key_info.latest_version,
"min_decryption_version": key_info.min_decryption_version,
"supports_encryption": key_info.supports_encryption,
"supports_decryption": key_info.supports_decryption,
"supports_signing": key_info.supports_signing,
"exportable": key_info.exportable,
"deletion_allowed": key_info.deletion_allowed
}
# Export key if exportable
if key_info.exportable:
exported_key = self.export_key(key_name)
if exported_key:
backup_data["keys"][key_name]["exported_data"] = exported_key
except Exception as e:
logger.error(f"Failed to backup key {key_name}: {e}")
# Save backup
with open(backup_file, 'w') as f:
json.dump(backup_data, f, indent=2)
logger.info(f"Backup saved to {backup_file}")
return True
except Exception as e:
logger.error(f"Failed to backup keys: {e}")
return False
def close(self):
"""Close the Vault client connection"""
self.client.close()
logger.info("Vault Transit client connection closed")
class TransitEncryptor:
"""
High-level encryption wrapper using Vault Transit
"""
def __init__(self, transit_client: VaultTransitClient, default_key: str):
"""
Initialize TransitEncryptor
Args:
transit_client: VaultTransitClient instance
default_key: Default key name for encryption operations
"""
self.client = transit_client
self.default_key = default_key
def encrypt_string(
self,
plaintext: str,
key_name: Optional[str] = None,
context: Optional[Dict[str, Any]] = None
) -> str:
"""
Encrypt a string
Args:
plaintext: String to encrypt
key_name: Key name to use (optional, uses default if not provided)
context: Encryption context (optional)
Returns:
Base64-encoded encrypted result (JSON)
"""
key = key_name or self.default_key
result = self.client.encrypt(key, plaintext, context=context)
# Return as JSON for easy storage and transport
return base64.b64encode(json.dumps({
'ciphertext': result.ciphertext,
'iv': result.iv,
'tag': result.tag,
'key_version': result.key_version,
'key_name': result.key_name,
'timestamp': result.timestamp.isoformat()
}).encode()).decode()
def decrypt_string(
self,
encrypted_data: str,
key_name: Optional[str] = None
) -> str:
"""
Decrypt a string
Args:
encrypted_data: Base64-encoded encrypted data
key_name: Key name to use (optional, extracted from data if not provided)
Returns:
Decrypted string
"""
# Decode and parse encrypted data
try:
data_json = base64.b64decode(encrypted_data.encode()).decode()
data = json.loads(data_json)
ciphertext = data['ciphertext']
target_key = key_name or data['key_name']
key_version = data.get('key_version')
except Exception as e:
# Try legacy format (just ciphertext)
ciphertext = encrypted_data
target_key = key_name or self.default_key
key_version = None
result = self.client.decrypt(target_key, ciphertext, key_version=key_version)
return result.plaintext
def encrypt_file(
self,
file_path: str,
key_name: Optional[str] = None,
context: Optional[Dict[str, Any]] = None
) -> EncryptionResult:
"""
Encrypt a file
Args:
file_path: Path to file to encrypt
key_name: Key name to use (optional)
context: Encryption context (optional)
Returns:
EncryptionResult object
"""
with open(file_path, 'rb') as f:
file_data = f.read()
key = key_name or self.default_key
return self.client.encrypt(key, file_data, context=context)
def decrypt_file(
self,
encrypted_result: EncryptionResult,
output_path: str
) -> bool:
"""
Decrypt a file and write to disk
Args:
encrypted_result: EncryptionResult from file encryption
output_path: Path to write decrypted file
Returns:
True if successful
"""
try:
result = self.client.decrypt(
encrypted_result.key_name,
encrypted_result.ciphertext,
context=encrypted_result.context
)
# Write decrypted data to file
with open(output_path, 'wb') as f:
f.write(result.plaintext.encode('utf-8'))
logger.info(f"File decrypted and saved to {output_path}")
return True
except Exception as e:
logger.error(f"Failed to decrypt file: {e}")
return False
# Example usage
def main():
# Initialize Vault Transit client
client = VaultTransitClient(
vault_url=os.getenv("VAULT_URL", "https://vault.example.com:8200"),
token=os.getenv("VAULT_TOKEN"),
mount_path="transit",
namespace="secret"
)
try:
# Create a new key
key_name = "my-app-key"
client.create_key(
key_name=key_name,
key_type="aes256-gcm",
auto_rotate_period="90d"
)
# Get key information
key_info = client.get_key_info(key_name)
print(f"Key info: {key_info}")
# Initialize high-level encryptor
encryptor = TransitEncryptor(client, key_name)
# Encrypt a string
secret_data = "This is my secret message"
encrypted = encryptor.encrypt_string(secret_data)
print(f"Encrypted: {encrypted[:50]}...")
# Decrypt the string
decrypted = encryptor.decrypt_string(encrypted)
print(f"Decrypted: {decrypted}")
# Encrypt with context
context = {"user_id": "12345", "app": "myapp"}
encrypted_with_context = client.encrypt(key_name, secret_data, context=context)
print(f"Encrypted with context version: {encrypted_with_context.key_version}")
# Decrypt with context
decrypted_with_context = client.decrypt(key_name, encrypted_with_context.ciphertext, context=context)
print(f"Decrypted with context: {decrypted_with_context.plaintext}")
# Sign and verify data
signature_result = client.sign_data(f"{key_name}-signing", secret_data, algorithm="sha2-256")
print(f"Signature: {signature_result.signature[:50]}...")
is_valid = client.verify_signature(f"{key_name}-signing", secret_data, signature_result.signature)
print(f"Signature valid: {is_valid}")
# Rotate key
rotation_result = client.rotate_key(key_name)
print(f"Key rotation: {rotation_result.message}")
# List all keys
keys = client.list_keys()
print(f"Available keys: {keys}")
# Backup keys
client.backup_keys()
# Clean up
client.delete_key(key_name)
except Exception as e:
print(f"Error: {e}")
finally:
client.close()
if __name__ == "__main__":
main()
# Example Flask integration
"""
from flask import Flask, request, jsonify
from vault_transit import VaultTransitClient, TransitEncryptor
app = Flask(__name__)
# Initialize Vault Transit client
transit_client = VaultTransitClient(
vault_url=os.getenv("VAULT_URL"),
token=os.getenv("VAULT_TOKEN")
)
# Initialize encryptor
encryptor = TransitEncryptor(transit_client, "app-default-key")
@app.route('/api/encrypt', methods=['POST'])
def encrypt_endpoint():
data = request.get_json()
plaintext = data.get('plaintext')
key_name = data.get('key_name')
context = data.get('context')
if not plaintext:
return jsonify({'error': 'plaintext is required'}), 400
try:
encrypted = encryptor.encrypt_string(plaintext, key_name, context)
return jsonify({'encrypted': encrypted})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/decrypt', methods=['POST'])
def decrypt_endpoint():
data = request.get_json()
encrypted_data = data.get('encrypted')
key_name = data.get('key_name')
if not encrypted_data:
return jsonify({'error': 'encrypted data is required'}), 400
try:
decrypted = encryptor.decrypt_string(encrypted_data, key_name)
return jsonify({'decrypted': decrypted})
except Exception as e:
return jsonify({'error': str(e)}), 500
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
"""
export { VaultTransitClient, TransitEncryptor, EncryptionResult, DecryptionResult };