🎯 Exemples recommandés
Balanced sample collections from various categories for you to explore
Exemples SAML
Exemples d'implémentation SAML (Security Assertion Markup Language) incluant SSO, métadonnées et configurations d'Identity Provider
💻 Flux d'Authentification SAML SSO typescript
🟡 intermediate
⭐⭐⭐⭐
Implémentation complète du flux Single Sign-On SAML 2.0 avec Service Provider et Identity Provider
⏱️ 45 min
🏷️ saml, authentication, sso, security
Prerequisites:
SAML 2.0 concepts, TypeScript/JavaScript, XML handling, Cryptography basics
// SAML 2.0 Single Sign-On Implementation
// Complete flow between Service Provider (SP) and Identity Provider (IdP)
import * as crypto from 'crypto';
import * as zlib from 'zlib';
import { XMLBuilder, XMLParser } from 'fast-xml-parser';
import * as querystring from 'querystring';
import * as xmlCrypto from 'xml-crypto';
interface SAMLConfig {
issuer: string; // SP Entity ID
entryPoint: string; // IdP SSO URL
cert: string; // IdP Certificate
privateKey?: string; // SP Private Key
callbackUrl: string; // ACS URL
signatureAlgorithm: string;
digestAlgorithm: string;
nameIdFormat: string;
acceptedClockSkewMs?: number;
}
interface SAMLResponse {
id: string;
inResponseTo: string;
destination: string;
issueInstant: string;
issuer: string;
nameId: string;
attributes: Record<string, string[]>;
conditions?: {
notBefore: string;
notOnOrAfter: string;
audienceRestriction?: string[];
};
}
class SAMLClient {
private config: SAMLConfig;
private xmlParser: XMLParser;
private xmlBuilder: XMLBuilder;
constructor(config: SAMLConfig) {
this.config = {
acceptedClockSkewMs: 0,
...config
};
this.xmlParser = new XMLParser({
ignoreAttributes: false,
attributeNamePrefix: '@_',
textNodeName: '#text'
});
this.xmlBuilder = new XMLBuilder({
ignoreAttributes: false,
attributeNamePrefix: '@_',
textNodeName: '#text',
format: true
});
}
// Generate AuthNRequest for IdP
generateAuthNRequest(forceAuthn = false): string {
const id = `_id_${crypto.randomBytes(16).toString('hex')}`;
const issueInstant = new Date().toISOString();
const destination = this.config.entryPoint;
const authnRequest = {
'samlp:AuthnRequest': {
'@_xmlns:samlp': 'urn:oasis:names:tc:SAML:2.0:protocol',
'@_xmlns:saml': 'urn:oasis:names:tc:SAML:2.0:assertion',
'@_ID': id,
'@_Version': '2.0',
'@_IssueInstant': issueInstant,
'@_Destination': destination,
'@_ProtocolBinding': 'urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST',
'@_AssertionConsumerServiceURL': this.config.callbackUrl,
'@_ForceAuthn': forceAuthn.toString(),
'saml:Issuer': {
'#text': this.config.issuer
},
'samlp:NameIDPolicy': {
'@_Format': this.config.nameIdFormat,
'@_AllowCreate': 'true'
},
'samlp:RequestedAuthnContext': {
'@_Comparison': 'minimum',
'@_xmlns:samlp': 'urn:oasis:names:tc:SAML:2.0:protocol',
'saml:AuthnContextClassRef': {
'#text': 'urn:oasis:names:tc:SAML:2.0:ac:classes:PasswordProtectedTransport'
}
}
}
};
return this.xmlBuilder.build(authnRequest);
}
// Generate base64 encoded AuthNRequest for redirect
getAuthNRequestUrl(forceAuthn = false): string {
const authnRequestXml = this.generateAuthNRequest(forceAuthn);
const buffer = Buffer.from(authnRequestXml, 'utf8');
const base64Request = buffer.toString('base64');
const encodedRequest = Buffer.from(base64Request).toString('base64url');
const params = new URLSearchParams({
SAMLRequest: encodedRequest,
RelayState: this.generateRelayState()
});
return `${this.config.entryPoint}?${params.toString()}`;
}
// Generate RelayState for CSRF protection
private generateRelayState(): string {
return crypto.randomBytes(16).toString('hex');
}
// Process SAML Response from IdP
async processSAMLResponse(samlResponse: string): Promise<SAMLResponse> {
try {
// Decode and decompress response
const decodedResponse = Buffer.from(samlResponse, 'base64').toString();
let xml = decodedResponse;
// Try to decompress if it's compressed
if (xml.startsWith('<?xml')) {
// Already XML
} else {
try {
xml = zlib.inflateRawSync(Buffer.from(samlResponse, 'base64')).toString();
} catch (error) {
xml = decodedResponse;
}
}
// Parse XML
const parsedResponse = this.xmlParser.parse(xml);
const assertion = parsedResponse['samlp:Response']['saml:Assertion'];
// Validate response
await this.validateResponse(parsedResponse, assertion);
// Extract information
const response: SAMLResponse = {
id: parsedResponse['samlp:Response']['@_ID'],
inResponseTo: parsedResponse['samlp:Response']['@_InResponseTo'],
destination: parsedResponse['samlp:Response']['@_Destination'],
issueInstant: parsedResponse['samlp:Response']['@_IssueInstant'],
issuer: assertion['saml:Issuer']['#text'],
nameId: assertion['saml:Subject']['saml:NameID']['#text'],
attributes: this.extractAttributes(assertion)
};
// Extract conditions if present
if (assertion['saml:Conditions']) {
response.conditions = {
notBefore: assertion['saml:Conditions']['@_NotBefore'],
notOnOrAfter: assertion['saml:Conditions']['@_NotOnOrAfter']
};
const audienceRestriction = assertion['saml:Conditions']['saml:AudienceRestriction'];
if (audienceRestriction) {
const audience = Array.isArray(audienceRestriction)
? audienceRestriction.map(ar => ar['saml:Audience']['#text'])
: [audienceRestriction['saml:Audience']['#text']];
response.conditions.audienceRestriction = audience;
}
}
return response;
} catch (error) {
throw new Error(`Failed to process SAML response: ${error.message}`);
}
}
// Extract attributes from assertion
private extractAttributes(assertion: any): Record<string, string[]> {
const attributes: Record<string, string[]> = {};
const attributeStatement = assertion['saml:AttributeStatement'];
if (!attributeStatement) return attributes;
const samlAttributes = Array.isArray(attributeStatement['saml:Attribute'])
? attributeStatement['saml:Attribute']
: [attributeStatement['saml:Attribute']];
for (const attribute of samlAttributes) {
const name = attribute['@_Name'];
const values = Array.isArray(attribute['saml:AttributeValue'])
? attribute['saml:AttributeValue'].map((val: any) => val['#text'])
: [attribute['saml:AttributeValue']['#text']];
attributes[name] = values;
}
return attributes;
}
// Validate SAML response
private async validateResponse(response: any, assertion: any): Promise<void> {
// Validate timestamps
const now = new Date();
const skewMs = this.config.acceptedClockSkewMs || 0;
const notBefore = new Date(assertion['saml:Conditions']['@_NotBefore']);
const notOnOrAfter = new Date(assertion['saml:Conditions']['@_NotOnOrAfter']);
if (now.getTime() + skewMs < notBefore.getTime()) {
throw new Error('SAML assertion is not yet valid');
}
if (now.getTime() - skewMs >= notOnOrAfter.getTime()) {
throw new Error('SAML assertion has expired');
}
// Validate destination
if (response['samlp:Response']['@_Destination'] !== this.config.callbackUrl) {
throw new Error('SAML response destination mismatch');
}
// Validate audience
const audienceRestriction = assertion['saml:Conditions']['saml:AudienceRestriction'];
if (audienceRestriction) {
const audience = Array.isArray(audienceRestriction)
? audienceRestriction.map(ar => ar['saml:Audience']['#text'])
: [audienceRestriction['saml:Audience']['#text']];
if (!audience.includes(this.config.issuer)) {
throw new Error('SAML assertion audience mismatch');
}
}
// Validate signature if present
if (response['samlp:Response']['ds:Signature']) {
await this.validateSignature(response['samlp:Response'], this.config.cert);
}
if (assertion['ds:Signature']) {
await this.validateSignature(assertion, this.config.cert);
}
}
// Validate XML signature
private async validateSignature(node: any, certificate: string): Promise<void> {
const xml = this.xmlBuilder.build(node);
const doc = new DOMParser().parseFromString(xml, 'text/xml');
const signature = doc.getElementsByTagName('ds:Signature')[0];
if (!signature) {
throw new Error('No signature found in SAML response');
}
const sig = new xmlCrypto.SignedXml();
sig.keyInfoProvider = {
getKeyInfo: (key) => '<X509Data></X509Data>',
getKey: (keyInfo) => {
return certificate.replace(/-----BEGIN CERTIFICATE-----|-----END CERTIFICATE-----|\n/g, '');
}
};
sig.loadSignature(signature);
if (!sig.checkSignature(xml)) {
throw new Error('Invalid SAML response signature');
}
}
// Generate SAML Logout Request
generateLogoutRequest(sessionIndex: string, nameId: string): string {
const id = `_id_${crypto.randomBytes(16).toString('hex')}`;
const issueInstant = new Date().toISOString();
const logoutRequest = {
'samlp:LogoutRequest': {
'@_xmlns:samlp': 'urn:oasis:names:tc:SAML:2.0:protocol',
'@_xmlns:saml': 'urn:oasis:names:tc:SAML:2.0:assertion',
'@_ID': id,
'@_Version': '2.0',
'@_IssueInstant': issueInstant,
'@_Destination': this.config.entryPoint,
'saml:Issuer': {
'#text': this.config.issuer
},
'saml:NameID': {
'@_Format': this.config.nameIdFormat,
'#text': nameId
},
'samlp:SessionIndex': {
'#text': sessionIndex
}
}
};
return this.xmlBuilder.build(logoutRequest);
}
}
// Example configuration
const samlConfig: SAMLConfig = {
issuer: 'https://your-sp.com/metadata',
entryPoint: 'https://your-idp.com/sso',
callbackUrl: 'https://your-sp.com/auth/callback',
cert: `-----BEGIN CERTIFICATE-----
...
-----END CERTIFICATE-----`,
privateKey: `-----BEGIN PRIVATE KEY-----
...
-----END PRIVATE KEY-----`,
signatureAlgorithm: 'http://www.w3.org/2001/04/xmldsig-more#rsa-sha256',
digestAlgorithm: 'http://www.w3.org/2001/04/xmlenc#sha256',
nameIdFormat: 'urn:oasis:names:tc:SAML:2.0:nameid-format:transient'
};
// Example usage in Express.js
/*
import express from 'express';
const app = express();
const samlClient = new SAMLClient(samlConfig);
// Initiate SSO
app.get('/auth/saml', (req, res) => {
const authUrl = samlClient.getAuthNRequestUrl();
res.redirect(authUrl);
});
// SSO Callback
app.post('/auth/callback', express.urlencoded({ extended: true }), async (req, res) => {
try {
const samlResponse = req.body.SAMLResponse;
const response = await samlClient.processSAMLResponse(samlResponse);
// Create user session
req.session.user = {
id: response.nameId,
attributes: response.attributes
};
res.redirect('/dashboard');
} catch (error) {
console.error('SAML authentication failed:', error);
res.status(401).send('Authentication failed');
}
});
// Logout
app.post('/auth/logout', (req, res) => {
const logoutRequest = samlClient.generateLogoutRequest(
req.session.sessionIndex,
req.session.user.id
);
// Send logout request to IdP
res.redirect(`${samlConfig.entryPoint}?${new URLSearchParams({
SAMLRequest: Buffer.from(logoutRequest).toString('base64')
})}`);
});
*/
export { SAMLClient, SAMLConfig, SAMLResponse };
💻 Génération de Métadonnées SAML typescript
🟡 intermediate
⭐⭐⭐
Générer des métadonnées SAML 2.0 pour la configuration et la découverte de Service Provider
⏱️ 30 min
🏷️ saml, metadata, configuration
Prerequisites:
SAML 2.0 metadata concepts, TypeScript/JavaScript, XML generation
// SAML 2.0 Metadata Generation
// Generate Service Provider metadata for Identity Provider configuration
import * as crypto from 'crypto';
import { XMLBuilder } from 'fast-xml-parser';
interface SPMetadataConfig {
entityId: string; // SP Entity ID
assertionConsumerServiceUrl: string; // ACS URL
singleLogoutServiceUrl?: string; // SLO URL
signingCert?: string; // SP Signing Certificate
encryptionCert?: string; // SP Encryption Certificate
nameIdFormats?: string[]; // Supported NameID formats
attributeConsumingServices?: AttributeConsumingService[];
organization?: {
name: string;
displayName: string;
url: string;
};
contactPerson?: {
name: string;
email: string;
type: 'technical' | 'support' | 'administrative' | 'billing' | 'other';
};
}
interface AttributeConsumingService {
name: string;
description: string;
requestedAttributes: {
name: string;
nameFormat?: string;
friendlyName?: string;
required?: boolean;
}[];
}
class SPMetadataGenerator {
private config: SPMetadataConfig;
private xmlBuilder: XMLBuilder;
constructor(config: SPMetadataConfig) {
this.config = {
nameIdFormats: [
'urn:oasis:names:tc:SAML:2.0:nameid-format:transient',
'urn:oasis:names:tc:SAML:2.0:nameid-format:persistent',
'urn:oasis:names:tc:SAML:1.1:nameid-format:emailAddress'
],
...config
};
this.xmlBuilder = new XMLBuilder({
ignoreAttributes: false,
attributeNamePrefix: '@_',
textNodeName: '#text',
format: true
});
}
// Generate complete SP metadata
generateMetadata(): string {
const entityId = this.config.entityId;
const validUntil = new Date();
validUntil.setFullYear(validUntil.getFullYear() + 1);
const metadata = {
'md:EntityDescriptor': {
'@_xmlns:md': 'urn:oasis:names:tc:SAML:2.0:metadata',
'@_xmlns:ds': 'http://www.w3.org/2000/09/xmldsig#',
'@_entityID': entityId,
'@_validUntil': validUntil.toISOString(),
'@_ID': `_sp_metadata_${crypto.randomBytes(16).toString('hex')}`,
'md:SPSSODescriptor': {
'@_protocolSupportEnumeration': 'urn:oasis:names:tc:SAML:2.0:protocol',
'@_AuthnRequestsSigned': (this.config.signingCert ? 'true' : 'false'),
'@_WantAssertionsSigned': 'true',
...this.buildNameIDFormats(),
...this.buildAssertionConsumerServices(),
...this.buildSingleLogoutServices(),
...this.buildAttributeConsumingServices(),
...this.buildKeyDescriptors()
},
...this.buildOrganization(),
...this.buildContactPerson()
}
};
return this.xmlBuilder.build(metadata);
}
// Build NameIDFormat elements
private buildNameIDFormats(): Record<string, any> {
if (!this.config.nameIdFormats || this.config.nameIdFormats.length === 0) {
return {};
}
const nameIdFormats = this.config.nameIdFormats.map(format => ({
'md:NameIDFormat': { '#text': format }
}));
return { 'md:NameIDFormat': nameIdFormats.map(nif => nif['md:NameIDFormat']) };
}
// Build AssertionConsumerService elements
private buildAssertionConsumerServices(): Record<string, any> {
const services = [];
// HTTP-POST binding
services.push({
'@_index': '0',
'@_Binding': 'urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST',
'@_Location': this.config.assertionConsumerServiceUrl,
'#text': this.config.assertionConsumerServiceUrl
});
// HTTP-Redirect binding (same URL, different binding)
services.push({
'@_index': '1',
'@_Binding': 'urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect',
'@_Location': this.config.assertionConsumerServiceUrl,
'#text': this.config.assertionConsumerServiceUrl
});
return { 'md:AssertionConsumerService': services };
}
// Build SingleLogoutService elements
private buildSingleLogoutServices(): Record<string, any> {
if (!this.config.singleLogoutServiceUrl) {
return {};
}
const services = [
{
'@_Binding': 'urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST',
'@_Location': this.config.singleLogoutServiceUrl
},
{
'@_Binding': 'urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect',
'@_Location': this.config.singleLogoutServiceUrl
}
];
return { 'md:SingleLogoutService': services };
}
// Build AttributeConsumingService elements
private buildAttributeConsumingServices(): Record<string, any> {
if (!this.config.attributeConsumingServices || this.config.attributeConsumingServices.length === 0) {
return {};
}
const services = this.config.attributeConsumingServices.map((service, index) => ({
'@_index': index.toString(),
'md:ServiceName': {
'@_xml:lang': 'en',
'#text': service.name
},
'md:ServiceDescription': {
'@_xml:lang': 'en',
'#text': service.description
},
'md:RequestedAttribute': service.requestedAttributes.map(attr => ({
'@_Name': attr.name,
'@_NameFormat': attr.nameFormat || 'urn:oasis:names:tc:SAML:2.0:attrname-format:basic',
'@_FriendlyName': attr.friendlyName || attr.name,
'@_isRequired': attr.required ? 'true' : 'false'
}))
}));
return { 'md:AttributeConsumingService': services };
}
// Build KeyDescriptor elements
private buildKeyDescriptors(): Record<string, any> {
const keyDescriptors = [];
if (this.config.signingCert) {
keyDescriptors.push({
'@_use': 'signing',
'ds:KeyInfo': {
'ds:X509Data': {
'ds:X509Certificate': {
'#text': this.formatCertificate(this.config.signingCert)
}
}
}
});
}
if (this.config.encryptionCert) {
keyDescriptors.push({
'@_use': 'encryption',
'ds:KeyInfo': {
'ds:X509Data': {
'ds:X509Certificate': {
'#text': this.formatCertificate(this.config.encryptionCert)
}
}
},
'md:EncryptionMethod': [
{ '@_Algorithm': 'http://www.w3.org/2001/04/xmlenc#aes128-cbc' },
{ '@_Algorithm': 'http://www.w3.org/2001/04/xmlenc#rsa-oaep-mgf1p' }
]
});
}
return keyDescriptors.length > 0 ? { 'md:KeyDescriptor': keyDescriptors } : {};
}
// Format certificate by removing headers and newlines
private formatCertificate(cert: string): string {
return cert
.replace(/-----BEGIN CERTIFICATE-----/g, '')
.replace(/-----END CERTIFICATE-----/g, '')
.replace(/\n/g, '')
.trim();
}
// Build Organization element
private buildOrganization(): Record<string, any> {
if (!this.config.organization) {
return {};
}
return {
'md:Organization': {
'md:OrganizationName': {
'@_xml:lang': 'en',
'#text': this.config.organization.name
},
'md:OrganizationDisplayName': {
'@_xml:lang': 'en',
'#text': this.config.organization.displayName
},
'md:OrganizationURL': {
'@_xml:lang': 'en',
'#text': this.config.organization.url
}
}
};
}
// Build ContactPerson element
private buildContactPerson(): Record<string, any> {
if (!this.config.contactPerson) {
return {};
}
return {
'md:ContactPerson': {
'@_contactType': this.config.contactPerson.type,
'md:Company': { '#text': '' }, // Optional company name
'md:GivenName': { '#text': this.config.contactPerson.name.split(' ')[0] },
'md:SurName': { '#text': this.config.contactPerson.name.split(' ').slice(1).join(' ') },
'md:EmailAddress': {
'#text': `mailto:${this.config.contactPerson.email}`
}
}
};
}
// Sign metadata with private key (optional)
signMetadata(metadata: string, privateKey: string): string {
// Implementation would use xml-crypto to sign the metadata
// This is a placeholder for the actual signing implementation
console.log('Metadata signing not implemented in this example');
return metadata;
}
}
// Example usage
const spMetadataConfig: SPMetadataConfig = {
entityId: 'https://your-sp.com/metadata',
assertionConsumerServiceUrl: 'https://your-sp.com/auth/callback',
singleLogoutServiceUrl: 'https://your-sp.com/auth/logout',
signingCert: `-----BEGIN CERTIFICATE-----
...
-----END CERTIFICATE-----`,
encryptionCert: `-----BEGIN CERTIFICATE-----
...
-----END CERTIFICATE-----`,
attributeConsumingServices: [
{
name: 'Default Service',
description: 'Default service for user authentication',
requestedAttributes: [
{
name: 'email',
friendlyName: 'Email Address',
required: true
},
{
name: 'givenName',
friendlyName: 'First Name',
required: false
},
{
name: 'sn',
friendlyName: 'Last Name',
required: false
}
]
}
],
organization: {
name: 'Your Company',
displayName: 'Your Company Ltd',
url: 'https://your-company.com'
},
contactPerson: {
name: 'John Doe',
email: '[email protected]',
type: 'technical'
}
};
// Generate metadata
const metadataGenerator = new SPMetadataGenerator(spMetadataConfig);
const metadata = metadataGenerator.generateMetadata();
console.log(metadata);
// Example Express route to serve metadata
/*
import express from 'express';
const app = express();
app.get('/metadata.xml', (req, res) => {
res.set('Content-Type', 'application/xml');
res.send(metadata);
});
*/
export { SPMetadataGenerator, SPMetadataConfig };
💻 Identity Provider SAML python
🔴 complex
⭐⭐⭐⭐⭐
Implémentation de base d'Identity Provider SAML 2.0 pour l'authentification des utilisateurs
⏱️ 60 min
🏷️ saml, identity provider, authentication, security
Prerequisites:
SAML 2.0 protocol, Python, Cryptography, Web frameworks
#!/usr/bin/env python3
"""
SAML 2.0 Identity Provider Implementation
Basic IdP for issuing SAML assertions and managing user authentication
"""
import base64
import hashlib
import secrets
import time
import uuid
from datetime import datetime, timedelta
from typing import Dict, Optional, List, Any
from xml.etree import ElementTree as ET
from xml.dom import minidom
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives.serialization import pkcs12
from cryptography import x509
from urllib.parse import urlparse, parse_qs
import zlib
import jwt
class SAMLIdentityProvider:
"""Simple SAML 2.0 Identity Provider"""
def __init__(
self,
entity_id: str,
sso_url: str,
slo_url: str,
private_key: str,
certificate: str,
encryption_cert: Optional[str] = None
):
self.entity_id = entity_id
self.sso_url = sso_url
self.slo_url = slo_url
self.private_key = private_key
self.certificate = certificate
self.encryption_cert = encryption_cert
# User database (in practice, use a real database)
self.users: Dict[str, Dict[str, Any]] = {
'[email protected]': {
'id': '[email protected]',
'email': '[email protected]',
'first_name': 'John',
'last_name': 'Doe',
'roles': ['user', 'admin'],
'groups': ['employees', 'developers']
},
'[email protected]': {
'id': '[email protected]',
'email': '[email protected]',
'first_name': 'Jane',
'last_name': 'Smith',
'roles': ['user'],
'groups': ['employees']
}
}
# Service provider configurations
self.service_providers: Dict[str, Dict[str, Any]] = {}
# Session storage
self.sessions: Dict[str, Dict[str, Any]] = {}
def register_service_provider(
self,
entity_id: str,
acs_url: str,
slo_url: Optional[str] = None,
name_id_format: str = "urn:oasis:names:tc:SAML:2.0:nameid-format:transient",
attributes: Optional[List[str]] = None
):
"""Register a Service Provider"""
self.service_providers[entity_id] = {
'entity_id': entity_id,
'acs_url': acs_url,
'slo_url': slo_url,
'name_id_format': name_id_format,
'attributes': attributes or ['email', 'first_name', 'last_name']
}
def authenticate_user(self, username: str, password: str) -> Optional[Dict[str, Any]]:
"""Authenticate user credentials"""
# In practice, verify against real user database
user = self.users.get(username)
if user and password == "password123": # Simple authentication for demo
return user
return None
def create_session(self, user: Dict[str, Any], sp_entity_id: str) -> str:
"""Create user session"""
session_id = secrets.token_urlsafe(32)
self.sessions[session_id] = {
'user': user,
'sp_entity_id': sp_entity_id,
'created_at': datetime.utcnow(),
'last_accessed': datetime.utcnow()
}
return session_id
def get_session(self, session_id: str) -> Optional[Dict[str, Any]]:
"""Get session by ID"""
session = self.sessions.get(session_id)
if session and datetime.utcnow() - session['last_accessed'] < timedelta(hours=8):
session['last_accessed'] = datetime.utcnow()
return session
return None
def parse_authn_request(self, saml_request: str) -> Dict[str, Any]:
"""Parse SAML AuthnRequest"""
# Decode and decompress request
try:
decoded = base64.b64decode(saml_request)
try:
# Try inflate (deflate compression)
xml_str = zlib.decompress(decoded, -15).decode('utf-8')
except:
# Try without decompression
xml_str = decoded.decode('utf-8')
root = ET.fromstring(xml_str)
# Extract AuthnRequest details
ns = {'samlp': 'urn:oasis:names:tc:SAML:2.0:protocol',
'saml': 'urn:oasis:names:tc:SAML:2.0:assertion'}
authn_request = {
'id': root.get('ID'),
'issuer': root.find('saml:Issuer', ns).text if root.find('saml:Issuer', ns) is not None else None,
'destination': root.get('Destination'),
'assertion_consumer_service_url': root.get('AssertionConsumerServiceURL'),
'force_authn': root.get('ForceAuthn', 'false').lower() == 'true',
'name_id_policy_format': None
}
# Extract NameIDPolicy
name_id_policy = root.find('samlp:NameIDPolicy', ns)
if name_id_policy is not None:
authn_request['name_id_policy_format'] = name_id_policy.get('Format')
return authn_request
except Exception as e:
raise ValueError(f"Invalid SAML AuthnRequest: {str(e)}")
def create_saml_response(
self,
authn_request: Dict[str, Any],
user: Dict[str, Any],
session_index: Optional[str] = None
) -> str:
"""Create SAML Response"""
# Generate response details
response_id = f"_id_{uuid.uuid4().hex}"
issue_instant = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')
assertion_id = f"_assertion_{uuid.uuid4().hex}"
# Time conditions
not_before = datetime.utcnow() - timedelta(minutes=5)
not_on_or_after = datetime.utcnow() + timedelta(hours=8)
# Determine NameID format and value
name_id_format = authn_request.get('name_id_policy_format',
'urn:oasis:names:tc:SAML:2.0:nameid-format:transient')
if name_id_format.endswith('emailAddress'):
name_id_value = user['email']
elif name_id_format.endswith('persistent'):
name_id_value = user['id']
else: # transient
name_id_value = f"transient_{uuid.uuid4().hex}"
# Get SP configuration
sp_config = self.service_providers.get(authn_request['issuer'], {})
requested_attributes = sp_config.get('attributes', [])
# Build SAML Response XML
response_xml = self._build_response_xml(
response_id=response_id,
in_response_to=authn_request['id'],
destination=authn_request['assertion_consumer_service_url'],
issue_instant=issue_instant,
assertion_id=assertion_id,
name_id_format=name_id_format,
name_id_value=name_id_value,
user=user,
not_before=not_before,
not_on_or_after=not_on_or_after,
audience=authn_request['issuer'],
attributes=requested_attributes,
session_index=session_index
)
return response_xml
def _build_response_xml(
self,
response_id: str,
in_response_to: str,
destination: str,
issue_instant: str,
assertion_id: str,
name_id_format: str,
name_id_value: str,
user: Dict[str, Any],
not_before: datetime,
not_on_or_after: datetime,
audience: str,
attributes: List[str],
session_index: Optional[str]
) -> str:
"""Build SAML Response XML"""
# Create root element
ns = {
'samlp': 'urn:oasis:names:tc:SAML:2.0:protocol',
'saml': 'urn:oasis:names:tc:SAML:2.0:assertion',
'ds': 'http://www.w3.org/2000/09/xmldsig#'
}
# Register namespaces
ET.register_namespace('samlp', ns['samlp'])
ET.register_namespace('saml', ns['saml'])
ET.register_namespace('ds', ns['ds'])
# Response element
response = ET.Element(f"{{{ns['samlp']}}}Response")
response.set('ID', response_id)
response.set('InResponseTo', in_response_to)
response.set('Version', '2.0')
response.set('IssueInstant', issue_instant)
response.set('Destination', destination)
# Issuer
issuer = ET.SubElement(response, f"{{{ns['saml']}}}Issuer")
issuer.text = self.entity_id
# Status
status = ET.SubElement(response, f"{{{ns['samlp']}}}Status")
status_code = ET.SubElement(status, f"{{{ns['samlp']}}}StatusCode")
status_code.set('Value', 'urn:oasis:names:tc:SAML:2.0:status:Success')
# Assertion
assertion = ET.SubElement(response, f"{{{ns['saml']}}}Assertion")
assertion.set('ID', assertion_id)
assertion.set('IssueInstant', issue_instant)
assertion.set('Version', '2.0')
# Assertion Issuer
assertion_issuer = ET.SubElement(assertion, f"{{{ns['saml']}}}Issuer")
assertion_issuer.text = self.entity_id
# Subject
subject = ET.SubElement(assertion, f"{{{ns['saml']}}}Subject")
# NameID
name_id = ET.SubElement(subject, f"{{{ns['saml']}}}NameID")
name_id.set('Format', name_id_format)
name_id.text = name_id_value
# SubjectConfirmation
subject_confirmation = ET.SubElement(subject, f"{{{ns['saml']}}}SubjectConfirmation")
subject_confirmation.set('Method', 'urn:oasis:names:tc:SAML:2.0:cm:bearer')
subject_confirmation_data = ET.SubElement(subject_confirmation, f"{{{ns['saml']}}}SubjectConfirmationData")
subject_confirmation_data.set('NotOnOrAfter', not_on_or_after.strftime('%Y-%m-%dT%H:%M:%SZ'))
subject_confirmation_data.set('Recipient', destination)
subject_confirmation_data.set('InResponseTo', in_response_to)
# Conditions
conditions = ET.SubElement(assertion, f"{{{ns['saml']}}}Conditions")
conditions.set('NotBefore', not_before.strftime('%Y-%m-%dT%H:%M:%SZ'))
conditions.set('NotOnOrAfter', not_on_or_after.strftime('%Y-%m-%dT%H:%M:%SZ'))
# AudienceRestriction
audience_restriction = ET.SubElement(conditions, f"{{{ns['saml']}}}AudienceRestriction")
audience_element = ET.SubElement(audience_restriction, f"{{{ns['saml']}}}Audience")
audience_element.text = audience
# AuthnStatement
authn_statement = ET.SubElement(assertion, f"{{{ns['saml']}}}AuthnStatement")
authn_statement.set('AuthnInstant', issue_instant)
authn_statement.set('SessionIndex', session_index or f"session_{uuid.uuid4().hex}")
authn_context = ET.SubElement(authn_statement, f"{{{ns['saml']}}}AuthnContext")
authn_context_ref = ET.SubElement(authn_context, f"{{{ns['saml']}}}AuthnContextClassRef")
authn_context_ref.text = 'urn:oasis:names:tc:SAML:2.0:ac:classes:PasswordProtectedTransport'
# AttributeStatement
if attributes:
attribute_statement = ET.SubElement(assertion, f"{{{ns['saml']}}}AttributeStatement")
for attr_name in attributes:
if attr_name in user:
attribute = ET.SubElement(attribute_statement, f"{{{ns['saml']}}}Attribute")
attribute.set('Name', attr_name)
attribute.set('NameFormat', 'urn:oasis:names:tc:SAML:2.0:attrname-format:basic')
attribute_value = ET.SubElement(attribute, f"{{{ns['saml']}}}AttributeValue")
attribute_value.text = str(user[attr_name])
# Convert to string
xml_str = ET.tostring(response, encoding='unicode')
# Format XML for readability
dom = minidom.parseString(xml_str)
return dom.toprettyxml(indent=" ", encoding=None)
def generate_idp_metadata(self) -> str:
"""Generate IdP metadata XML"""
# Create EntityDescriptor
entity_descriptor = ET.Element('EntityDescriptor')
entity_descriptor.set('entityID', self.entity_id)
entity_descriptor.set('xmlns', 'urn:oasis:names:tc:SAML:2.0:metadata')
entity_descriptor.set('xmlns:ds', 'http://www.w3.org/2000/09/xmldsig#')
# IDPSSODescriptor
idp_descriptor = ET.SubElement(entity_descriptor, 'IDPSSODescriptor')
idp_descriptor.set('protocolSupportEnumeration', 'urn:oasis:names:tc:SAML:2.0:protocol')
idp_descriptor.set('WantAuthnRequestsSigned', 'false')
# KeyDescriptor for signing
key_descriptor = ET.SubElement(idp_descriptor, 'KeyDescriptor')
key_descriptor.set('use', 'signing')
key_info = ET.SubElement(key_descriptor, 'ds:KeyInfo')
x509_data = ET.SubElement(key_info, 'ds:X509Data')
x509_certificate = ET.SubElement(x509_data, 'ds:X509Certificate')
x509_certificate.text = self._format_cert(self.certificate)
# SingleSignOnService endpoints
sso_http_post = ET.SubElement(idp_descriptor, 'SingleSignOnService')
sso_http_post.set('Binding', 'urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST')
sso_http_post.set('Location', self.sso_url)
sso_http_redirect = ET.SubElement(idp_descriptor, 'SingleSignOnService')
sso_http_redirect.set('Binding', 'urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect')
sso_http_redirect.set('Location', self.sso_url)
# SingleLogoutService endpoints
slo_http_post = ET.SubElement(idp_descriptor, 'SingleLogoutService')
slo_http_post.set('Binding', 'urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST')
slo_http_post.set('Location', self.slo_url)
slo_http_redirect = ET.SubElement(idp_descriptor, 'SingleLogoutService')
slo_http_redirect.set('Binding', 'urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect')
slo_http_redirect.set('Location', self.slo_url)
# NameIDFormat
name_id_format = ET.SubElement(idp_descriptor, 'NameIDFormat')
name_id_format.text = 'urn:oasis:names:tc:SAML:2.0:nameid-format:transient'
# Convert to string
xml_str = ET.tostring(entity_descriptor, encoding='unicode')
# Format XML
dom = minidom.parseString(xml_str)
return dom.toprettyxml(indent=" ", encoding=None)
def _format_cert(self, cert: str) -> str:
"""Format certificate for XML"""
return cert.replace('-----BEGIN CERTIFICATE-----', '').replace('-----END CERTIFICATE-----', '').replace('\n', '').strip()
# Example usage and Flask web application
"""
from flask import Flask, request, redirect, render_template_string, make_response
app = Flask(__name__)
app.secret_key = 'your-secret-key'
# Initialize IdP
idp = SAMLIdentityProvider(
entity_id='https://idp.example.com/metadata',
sso_url='https://idp.example.com/sso',
slo_url='https://idp.example.com/slo',
private_key='''-----BEGIN PRIVATE KEY-----
...
-----END PRIVATE KEY-----''',
certificate='''-----BEGIN CERTIFICATE-----
...
-----END CERTIFICATE-----'''
)
# Register service providers
idp.register_service_provider(
entity_id='https://sp.example.com/metadata',
acs_url='https://sp.example.com/auth/callback',
slo_url='https://sp.example.com/auth/logout',
name_id_format='urn:oasis:names:tc:SAML:2.0:nameid-format:emailAddress',
attributes=['email', 'first_name', 'last_name', 'roles']
)
@app.route('/sso')
def sso():
"""Handle SAML SSO request"""
saml_request = request.args.get('SAMLRequest')
relay_state = request.args.get('RelayState')
if not saml_request:
return "Missing SAMLRequest parameter", 400
try:
# Parse AuthnRequest
authn_request = idp.parse_authn_request(saml_request)
# Store request in session
request.session['authn_request'] = authn_request
request.session['relay_state'] = relay_state
# Redirect to login page
return redirect('/login')
except Exception as e:
return f"Invalid SAML request: {str(e)}", 400
@app.route('/login', methods=['GET', 'POST'])
def login():
"""User login page"""
if request.method == 'GET':
return render_template_string('''
<h1>Login</h1>
<form method="post">
<div>
<label>Username:</label>
<input type="text" name="username" required>
</div>
<div>
<label>Password:</label>
<input type="password" name="password" required>
</div>
<button type="submit">Login</button>
</form>
<p>Use [email protected] / password123 for demo</p>
''')
username = request.form.get('username')
password = request.form.get('password')
# Authenticate user
user = idp.authenticate_user(username, password)
if not user:
return "Invalid credentials", 401
# Get stored AuthnRequest
authn_request = request.session.get('authn_request')
if not authn_request:
return "No authentication request found", 400
# Create session
session_id = idp.create_session(user, authn_request['issuer'])
# Generate SAML Response
saml_response = idp.create_saml_response(authn_request, user, session_id)
# Create response form
response_form = f'''
<html>
<body onload="document.forms[0].submit()">
<form method="post" action="{authn_request['assertion_consumer_service_url']}">
<input type="hidden" name="SAMLResponse" value="{base64.b64encode(saml_response.encode()).decode()}">
{f'<input type="hidden" name="RelayState" value="{request.session.get("relay_state")}">' if request.session.get('relay_state') else ''}
<div>Redirecting...</div>
</form>
</body>
</html>
'''
# Clear session
request.session.clear()
return response_form
@app.route('/metadata')
def metadata():
"""Serve IdP metadata"""
metadata_xml = idp.generate_idp_metadata()
response = make_response(metadata_xml)
response.headers['Content-Type'] = 'application/xml'
return response
if __name__ == '__main__':
app.run(debug=True, port=8000)
"""
export { SAMLIdentityProvider };