🎯 Recommended Samples
Balanced sample collections from various categories for you to explore
OAuth 2.0 Samples
OAuth 2.0 implementation examples including authorization flows, client configurations, and security best practices
💻 OAuth 2.0 Client Credentials Flow python
🟢 simple
⭐⭐
Machine-to-machine authentication using Client Credentials Grant
⏱️ 20 min
🏷️ oauth2, python, authentication, api
Prerequisites:
Python basics, HTTP requests, JSON
#!/usr/bin/env python3
"""
OAuth 2.0 Client Credentials Flow Implementation
Used for machine-to-machine authentication where no user interaction is required.
"""
import base64
import secrets
import time
import requests
from typing import Dict, Optional, Any
from urllib.parse import urlencode
class OAuth2ClientCredentials:
"""OAuth 2.0 Client Credentials Grant implementation"""
def __init__(
self,
client_id: str,
client_secret: str,
token_endpoint: str,
scope: Optional[str] = None
):
self.client_id = client_id
self.client_secret = client_secret
self.token_endpoint = token_endpoint
self.scope = scope
self._access_token = None
self._token_expires_at = 0
self._refresh_token = None
def _get_basic_auth_header(self) -> str:
"""Create Basic Authentication header"""
credentials = f"{self.client_id}:{self.client_secret}"
encoded_credentials = base64.b64encode(credentials.encode()).decode()
return f"Basic {encoded_credentials}"
def _is_token_expired(self) -> bool:
"""Check if the current token is expired or will expire soon"""
return time.time() >= (self._token_expires_at - 60) # 60-second buffer
def get_access_token(self, force_refresh: bool = False) -> str:
"""
Get a valid access token, refreshing if necessary
Args:
force_refresh: Force token refresh even if current token is valid
Returns:
Valid access token
"""
if not force_refresh and self._access_token and not self._is_token_expired():
return self._access_token
return self._fetch_new_token()
def _fetch_new_token(self) -> str:
"""Fetch a new access token from the authorization server"""
token_data = {
'grant_type': 'client_credentials'
}
if self.scope:
token_data['scope'] = self.scope
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'Authorization': self._get_basic_auth_header()
}
try:
response = requests.post(
self.token_endpoint,
data=token_data,
headers=headers
)
response.raise_for_status()
token_info = response.json()
self._access_token = token_info['access_token']
# Set token expiration (default to 1 hour if not specified)
expires_in = token_info.get('expires_in', 3600)
self._token_expires_at = time.time() + expires_in
# Store additional token info
self._token_type = token_info.get('token_type', 'Bearer')
self._token_scope = token_info.get('scope', self.scope)
print(f"Successfully obtained new access token. Expires in {expires_in} seconds.")
return self._access_token
except requests.exceptions.RequestException as e:
print(f"Error fetching token: {e}")
if hasattr(e, 'response') and e.response is not None:
print(f"Response content: {e.response.text}")
raise
def make_authenticated_request(
self,
method: str,
url: str,
**kwargs
) -> requests.Response:
"""
Make an authenticated API request
Args:
method: HTTP method (GET, POST, PUT, DELETE, etc.)
url: Request URL
**kwargs: Additional arguments passed to requests
Returns:
requests.Response object
"""
# Ensure we have a valid token
token = self.get_access_token()
# Add authorization header
headers = kwargs.pop('headers', {})
headers['Authorization'] = f"Bearer {token}"
# Make the request
response = requests.request(method, url, headers=headers, **kwargs)
# If we get a 401, try refreshing the token once
if response.status_code == 401:
print("Token expired, refreshing...")
token = self.get_access_token(force_refresh=True)
headers['Authorization'] = f"Bearer {token}"
response = requests.request(method, url, headers=headers, **kwargs)
response.raise_for_status()
return response
def get(self, url: str, **kwargs) -> requests.Response:
"""Make authenticated GET request"""
return self.make_authenticated_request('GET', url, **kwargs)
def post(self, url: str, **kwargs) -> requests.Response:
"""Make authenticated POST request"""
return self.make_authenticated_request('POST', url, **kwargs)
def put(self, url: str, **kwargs) -> requests.Response:
"""Make authenticated PUT request"""
return self.make_authenticated_request('PUT', url, **kwargs)
def delete(self, url: str, **kwargs) -> requests.Response:
"""Make authenticated DELETE request"""
return self.make_authenticated_request('DELETE', url, **kwargs)
def revoke_token(self) -> bool:
"""Revoke the current access token"""
if not self._access_token:
return True
try:
revoke_data = {
'token': self._access_token,
'token_type_hint': 'access_token'
}
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'Authorization': self._get_basic_auth_header()
}
revoke_endpoint = self.token_endpoint.replace('/token', '/revoke')
response = requests.post(revoke_endpoint, data=revoke_data, headers=headers)
response.raise_for_status()
# Clear stored token
self._access_token = None
self._token_expires_at = 0
print("Token successfully revoked")
return True
except requests.exceptions.RequestException as e:
print(f"Error revoking token: {e}")
return False
# Example usage
if __name__ == "__main__":
# Configuration
config = {
"client_id": "your-client-id",
"client_secret": "your-client-secret",
"token_endpoint": "https://auth.example.com/oauth/token",
"scope": "api:read api:write"
}
# Initialize OAuth2 client
client = OAuth2ClientCredentials(**config)
try:
# Get access token
token = client.get_access_token()
print(f"Access token: {token[:20]}...")
# Make authenticated API requests
api_base_url = "https://api.example.com"
# GET request
response = client.get(f"{api_base_url}/users")
users = response.json()
print(f"Found {len(users)} users")
# POST request
new_user = {
"name": "John Doe",
"email": "[email protected]"
}
response = client.post(f"{api_base_url}/users", json=new_user)
created_user = response.json()
print(f"Created user: {created_user['id']}")
# PUT request
updated_user = {**created_user, "name": "John Smith"}
response = client.put(f"{api_base_url}/users/{created_user['id']}", json=updated_user)
print("User updated successfully")
# DELETE request
response = client.delete(f"{api_base_url}/users/{created_user['id']}")
print("User deleted successfully")
# Revoke token when done
client.revoke_token()
except Exception as e:
print(f"Error: {e}")
💻 OAuth 2.0 Authorization Code Flow javascript
🟡 intermediate
⭐⭐⭐
Complete implementation of OAuth 2.0 Authorization Code Flow with PKCE
⏱️ 30 min
🏷️ oauth2, security, authentication, authorization
Prerequisites:
HTTP basics, JavaScript ES6+, Cryptography basics
// OAuth 2.0 Authorization Code Flow with PKCE Implementation
const crypto = require('crypto');
const axios = require('axios');
class OAuth2Client {
constructor(config) {
this.clientId = config.clientId;
this.clientSecret = config.clientSecret;
this.redirectUri = config.redirectUri;
this.authServer = config.authServer;
this.tokenEndpoint = config.tokenEndpoint;
this.scope = config.scope || 'read';
}
// Generate PKCE verifier and challenge
generatePKCE() {
const verifier = crypto.randomBytes(32).toString('base64url');
const challenge = crypto.createHash('sha256')
.update(verifier)
.digest('base64url');
return { verifier, challenge };
}
// Build authorization URL
buildAuthorizationUrl(state, codeChallenge, additionalParams = {}) {
const params = new URLSearchParams({
response_type: 'code',
client_id: this.clientId,
redirect_uri: this.redirectUri,
scope: this.scope,
state: state,
code_challenge: codeChallenge,
code_challenge_method: 'S256',
...additionalParams
});
return `${this.authServer}/authorize?${params.toString()}`;
}
// Exchange authorization code for tokens
async exchangeCodeForTokens(code, codeVerifier, state) {
try {
const response = await axios.post(this.tokenEndpoint,
new URLSearchParams({
grant_type: 'authorization_code',
client_id: this.clientId,
client_secret: this.clientSecret,
code: code,
redirect_uri: this.redirectUri,
code_verifier: codeVerifier,
state: state
}),
{
headers: {
'Content-Type': 'application/x-www-form-urlencoded',
'Accept': 'application/json'
}
}
);
return {
accessToken: response.data.access_token,
refreshToken: response.data.refresh_token,
tokenType: response.data.token_type,
expiresIn: response.data.expires_in,
scope: response.data.scope
};
} catch (error) {
console.error('Token exchange failed:', error.response?.data || error.message);
throw new Error('Failed to exchange authorization code for tokens');
}
}
// Refresh access token
async refreshAccessToken(refreshToken) {
try {
const response = await axios.post(this.tokenEndpoint,
new URLSearchParams({
grant_type: 'refresh_token',
refresh_token: refreshToken,
client_id: this.clientId,
client_secret: this.clientSecret
}),
{
headers: {
'Content-Type': 'application/x-www-form-urlencoded',
'Accept': 'application/json'
}
}
);
return {
accessToken: response.data.access_token,
refreshToken: response.data.refresh_token || refreshToken,
expiresIn: response.data.expires_in
};
} catch (error) {
console.error('Token refresh failed:', error.response?.data || error.message);
throw new Error('Failed to refresh access token');
}
}
// Revoke token
async revokeToken(token, tokenTypeHint = 'access_token') {
try {
await axios.post(`${this.authServer}/revoke`,
new URLSearchParams({
token: token,
token_type_hint: tokenTypeHint,
client_id: this.clientId,
client_secret: this.clientSecret
}),
{
headers: {
'Content-Type': 'application/x-www-form-urlencoded'
}
}
);
return true;
} catch (error) {
console.error('Token revocation failed:', error.response?.data || error.message);
return false;
}
}
// Validate token (introspection)
async validateToken(token) {
try {
const response = await axios.post(`${this.authServer}/introspect`,
new URLSearchParams({
token: token,
client_id: this.clientId,
client_secret: this.clientSecret
}),
{
headers: {
'Content-Type': 'application/x-www-form-urlencoded'
}
}
);
return {
active: response.data.active,
scope: response.data.scope,
clientId: response.data.client_id,
username: response.data.username,
expiresAt: response.data.exp,
issuedAt: response.data.iat
};
} catch (error) {
console.error('Token validation failed:', error.response?.data || error.message);
return { active: false };
}
}
}
// Usage Example
const oauth2Config = {
clientId: 'your-client-id',
clientSecret: 'your-client-secret',
redirectUri: 'https://your-app.com/callback',
authServer: 'https://auth.example.com',
tokenEndpoint: 'https://auth.example.com/oauth/token',
scope: 'read write profile'
};
const oauth2Client = new OAuth2Client(oauth2Config);
// Example: Authorization flow
async function initiateAuthorization() {
const state = crypto.randomBytes(16).toString('hex');
const { verifier, challenge } = oauth2Client.generatePKCE();
// Store state and verifier in session for later verification
sessionStorage.setItem('oauth_state', state);
sessionStorage.setItem('code_verifier', verifier);
const authUrl = oauth2Client.buildAuthorizationUrl(state, challenge);
// Redirect user to authorization server
console.log('Redirect to:', authUrl);
// window.location.href = authUrl;
}
// Example: Handle callback
async function handleCallback(code, state, receivedState) {
const storedState = sessionStorage.getItem('oauth_state');
const codeVerifier = sessionStorage.getItem('code_verifier');
if (state !== storedState) {
throw new Error('Invalid state parameter - possible CSRF attack');
}
const tokens = await oauth2Client.exchangeCodeForTokens(code, codeVerifier, state);
// Store tokens securely
localStorage.setItem('access_token', tokens.accessToken);
localStorage.setItem('refresh_token', tokens.refreshToken);
sessionStorage.removeItem('oauth_state');
sessionStorage.removeItem('code_verifier');
return tokens;
}
// Example: API request with token
async function makeAuthenticatedRequest(url, accessToken) {
try {
const response = await axios.get(url, {
headers: {
'Authorization': `Bearer ${accessToken}`,
'Accept': 'application/json'
}
});
return response.data;
} catch (error) {
if (error.response?.status === 401) {
// Token expired, try to refresh
const refreshToken = localStorage.getItem('refresh_token');
if (refreshToken) {
const newTokens = await oauth2Client.refreshAccessToken(refreshToken);
localStorage.setItem('access_token', newTokens.accessToken);
localStorage.setItem('refresh_token', newTokens.refreshToken);
// Retry request with new token
return makeAuthenticatedRequest(url, newTokens.accessToken);
}
}
throw error;
}
}
// Export for use in other modules
module.exports = { OAuth2Client };
💻 OAuth 2.0 Resource Server typescript
🔴 complex
⭐⭐⭐⭐
Implementation of a protected API resource server that validates OAuth 2.0 tokens
⏱️ 45 min
🏷️ oauth2, security, typescript, api, middleware
Prerequisites:
Node.js/Express, JWT, TypeScript, OAuth 2.0 concepts
// OAuth 2.0 Resource Server Implementation
// Protects API endpoints with OAuth 2.0 token validation
import express from 'express';
import jwt from 'jsonwebtoken';
import axios from 'axios';
import rateLimit from 'express-rate-limit';
// Configuration interface
interface OAuth2Config {
issuer: string;
audience: string;
introspectionEndpoint: string;
jwksUri: string;
requiredScope?: string[];
}
// Token information interface
interface TokenInfo {
active: boolean;
scope?: string;
clientId?: string;
username?: string;
expiresAt?: number;
issuedAt?: number;
sub?: string;
aud?: string[];
iss?: string;
}
// JWT Key Store
class JWKSKeyStore {
private keys: Map<string, any> = new Map();
private lastRefresh: number = 0;
private refreshInterval: number = 3600000; // 1 hour
constructor(private jwksUri: string) {}
async getKey(kid: string): Promise<any> {
if (Date.now() - this.lastRefresh > this.refreshInterval) {
await this.refreshKeys();
}
const key = this.keys.get(kid);
if (!key) {
await this.refreshKeys();
return this.keys.get(kid);
}
return key;
}
private async refreshKeys(): Promise<void> {
try {
const response = await axios.get(this.jwksUri);
const jwks = response.data;
this.keys.clear();
for (const jwk of jwks.keys) {
this.keys.set(jwk.kid, jwk);
}
this.lastRefresh = Date.now();
console.log('JWKS keys refreshed successfully');
} catch (error) {
console.error('Failed to refresh JWKS keys:', error);
throw new Error('Unable to refresh JWKS keys');
}
}
}
// OAuth 2.0 Authentication Middleware
class OAuth2Middleware {
private keyStore: JWKSKeyStore;
constructor(private config: OAuth2Config) {
this.keyStore = new JWKSKeyStore(config.jwksUri);
}
// Main authentication middleware
authenticate() {
return async (req: express.Request, res: express.Response, next: express.NextFunction) => {
try {
const token = this.extractToken(req);
if (!token) {
return res.status(401).json({ error: 'Missing access token' });
}
const tokenInfo = await this.validateToken(token);
if (!tokenInfo.active) {
return res.status(401).json({ error: 'Invalid or expired token' });
}
if (!this.validateAudience(tokenInfo)) {
return res.status(403).json({ error: 'Invalid audience' });
}
if (!this.validateScope(tokenInfo)) {
return res.status(403).json({ error: 'Insufficient scope' });
}
// Attach token info to request
(req as any).user = {
sub: tokenInfo.sub,
username: tokenInfo.username,
clientId: tokenInfo.clientId,
scope: tokenInfo.scope?.split(' ') || []
};
next();
} catch (error) {
console.error('Authentication error:', error);
res.status(401).json({ error: 'Authentication failed' });
}
};
}
// Scope-based authorization middleware
requireScope(requiredScopes: string[]) {
return (req: express.Request, res: express.Response, next: express.NextFunction) => {
const user = (req as any).user;
if (!user || !user.scope) {
return res.status(403).json({ error: 'No scope information available' });
}
const hasRequiredScope = requiredScopes.every(scope =>
user.scope.includes(scope)
);
if (!hasRequiredScope) {
return res.status(403).json({
error: 'Insufficient scope',
required: requiredScopes,
provided: user.scope
});
}
next();
};
}
private extractToken(req: express.Request): string | null {
const authHeader = req.headers.authorization;
if (!authHeader) {
return null;
}
const parts = authHeader.split(' ');
if (parts.length !== 2 || parts[0] !== 'Bearer') {
return null;
}
return parts[1];
}
private async validateToken(token: string): Promise<TokenInfo> {
try {
// Try to validate as JWT first
const decoded = jwt.decode(token, { complete: true });
if (decoded && typeof decoded === 'object' && decoded.header.kid) {
return await this.validateJWT(token, decoded);
} else {
// Fallback to introspection
return await this.introspectToken(token);
}
} catch (error) {
// If JWT validation fails, try introspection
return await this.introspectToken(token);
}
}
private async validateJWT(token: string, decoded: any): Promise<TokenInfo> {
const header = decoded.header;
const payload = decoded.payload;
// Get signing key
const key = await this.keyStore.getKey(header.kid);
if (!key) {
throw new Error('Signing key not found');
}
// Verify JWT signature and claims
const verified = jwt.verify(token, this.getPublicKey(key), {
issuer: this.config.issuer,
audience: this.config.audience
});
return {
active: true,
sub: verified.sub,
scope: verified.scope,
clientId: verified.client_id || verified.azp,
username: verified.preferred_username || verified.username,
expiresAt: verified.exp,
issuedAt: verified.iat,
aud: verified.aud,
iss: verified.iss
};
}
private async introspectToken(token: string): Promise<TokenInfo> {
// Note: This requires client credentials for the resource server
// In a real implementation, these should be stored securely
const response = await axios.post(this.config.introspectionEndpoint,
new URLSearchParams({
token: token,
token_type_hint: 'access_token'
}),
{
auth: {
username: process.env.RESOURCE_SERVER_CLIENT_ID || '',
password: process.env.RESOURCE_SERVER_CLIENT_SECRET || ''
},
headers: {
'Content-Type': 'application/x-www-form-urlencoded'
}
}
);
const data = response.data;
return {
active: data.active,
sub: data.sub,
scope: data.scope,
clientId: data.client_id,
username: data.username,
expiresAt: data.exp,
issuedAt: data.iat,
aud: data.aud,
iss: data.iss
};
}
private validateAudience(tokenInfo: TokenInfo): boolean {
if (!tokenInfo.aud) return false;
const audiences = Array.isArray(tokenInfo.aud) ? tokenInfo.aud : [tokenInfo.aud];
return audiences.includes(this.config.audience);
}
private validateScope(tokenInfo: TokenInfo): boolean {
if (!this.config.requiredScope || this.config.requiredScope.length === 0) {
return true;
}
if (!tokenInfo.scope) return false;
const tokenScopes = tokenInfo.scope.split(' ');
return this.config.requiredScope.every(required =>
tokenScopes.includes(required)
);
}
private getPublicKey(jwk: any): string {
// Convert JWK to PEM format
// This is a simplified version - in production, use a proper library like 'jwk-to-pem'
const modulus = Buffer.from(jwk.n, 'base64url').toString('base64');
const exponent = Buffer.from(jwk.e, 'base64url').toString('base64');
const body = [
'-----BEGIN PUBLIC KEY-----',
...this.chunkString(`MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA${modulus}${exponent}`, 64),
'-----END PUBLIC KEY-----'
];
return body.join('\n');
}
private chunkString(str: string, size: number): string[] {
const chunks: string[] = [];
for (let i = 0; i < str.length; i += size) {
chunks.push(str.slice(i, i + size));
}
return chunks;
}
}
// Example API Server Setup
function createResourceServer(): express.Application {
const app = express();
// Configuration
const oauth2Config: OAuth2Config = {
issuer: process.env.OAUTH2_ISSUER || 'https://auth.example.com',
audience: process.env.OAUTH2_AUDIENCE || 'api://my-resource-server',
introspectionEndpoint: process.env.OAUTH2_INTROSPECTION_ENDPOINT || 'https://auth.example.com/introspect',
jwksUri: process.env.OAUTH2_JWKS_URI || 'https://auth.example.com/.well-known/jwks.json',
requiredScope: ['api:read']
};
const oauth2 = new OAuth2Middleware(oauth2Config);
// Middleware
app.use(express.json());
// Rate limiting
const limiter = rateLimit({
windowMs: 15 * 60 * 1000, // 15 minutes
max: 100 // limit each IP to 100 requests per windowMs
});
app.use(limiter);
// Health check (no auth required)
app.get('/health', (req, res) => {
res.json({ status: 'healthy', timestamp: new Date().toISOString() });
});
// Protected routes
const apiRouter = express.Router();
apiRouter.use(oauth2.authenticate());
// User profile - requires authentication
apiRouter.get('/profile', (req, res) => {
const user = (req as any).user;
res.json({
sub: user.sub,
username: user.username,
clientId: user.clientId,
scopes: user.scope
});
});
// Read users - requires api:read scope
apiRouter.get('/users', oauth2.requireScope(['api:read']), (req, res) => {
// Mock data
const users = [
{ id: 1, name: 'John Doe', email: '[email protected]' },
{ id: 2, name: 'Jane Smith', email: '[email protected]' }
];
res.json(users);
});
// Create user - requires api:write scope
apiRouter.post('/users',
oauth2.requireScope(['api:write']),
(req, res) => {
const { name, email } = req.body;
if (!name || !email) {
return res.status(400).json({ error: 'Name and email are required' });
}
// Mock user creation
const newUser = {
id: Date.now(),
name,
email,
createdAt: new Date().toISOString()
};
res.status(201).json(newUser);
}
);
// Admin endpoint - requires admin scope
apiRouter.get('/admin/stats',
oauth2.requireScope(['api:admin']),
(req, res) => {
res.json({
totalUsers: 1000,
activeUsers: 850,
totalRequests: 50000,
uptime: '99.9%'
});
}
);
app.use('/api', apiRouter);
// Error handling
app.use((err: Error, req: express.Request, res: express.Response, next: express.NextFunction) => {
console.error('Unhandled error:', err);
res.status(500).json({ error: 'Internal server error' });
});
return app;
}
// Start server
if (require.main === module) {
const port = process.env.PORT || 3000;
const app = createResourceServer();
app.listen(port, () => {
console.log(`Resource server listening on port ${port}`);
});
}
export { OAuth2Middleware, createResourceServer, OAuth2Config };