Elysia Tools
Navigation
Authentication & Authorization
OAuth 2.0 Samples
OAuth 2.0 implementation examples including authorization flows, client configurations, and security best practices
Samples
Entries inside this sample collection
OAuth 2.0 Authorization Code Flow
Complete implementation of OAuth 2.0 Authorization Code Flow with PKCE
Difficulty
6/10
Estimated time
30 min
Tags
oauth2, security, authentication, authorization
Prerequisites
HTTP basics, JavaScript ES6+, Cryptography basics
// OAuth 2.0 Authorization Code Flow with PKCE Implementation
const crypto = require('crypto');
const axios = require('axios');
class OAuth2Client {
constructor(config) {
this.clientId = config.clientId;
this.clientSecret = config.clientSecret;
this.redirectUri = config.redirectUri;
this.authServer = config.authServer;
this.tokenEndpoint = config.tokenEndpoint;
this.scope = config.scope || 'read';
}
// Generate PKCE verifier and challenge
generatePKCE() {
const verifier = crypto.randomBytes(32).toString('base64url');
const challenge = crypto.createHash('sha256')
.update(verifier)
.digest('base64url');
return { verifier, challenge };
}
// Build authorization URL
buildAuthorizationUrl(state, codeChallenge, additionalParams = {}) {
const params = new URLSearchParams({
response_type: 'code',
client_id: this.clientId,
redirect_uri: this.redirectUri,
scope: this.scope,
state: state,
code_challenge: codeChallenge,
code_challenge_method: 'S256',
...additionalParams
});
return `${this.authServer}/authorize?${params.toString()}`;
}
// Exchange authorization code for tokens
async exchangeCodeForTokens(code, codeVerifier, state) {
try {
const response = await axios.post(this.tokenEndpoint,
new URLSearchParams({
grant_type: 'authorization_code',
client_id: this.clientId,
client_secret: this.clientSecret,
code: code,
redirect_uri: this.redirectUri,
code_verifier: codeVerifier,
state: state
}),
{
headers: {
'Content-Type': 'application/x-www-form-urlencoded',
'Accept': 'application/json'
}
}
);
return {
accessToken: response.data.access_token,
refreshToken: response.data.refresh_token,
tokenType: response.data.token_type,
expiresIn: response.data.expires_in,
scope: response.data.scope
};
} catch (error) {
console.error('Token exchange failed:', error.response?.data || error.message);
throw new Error('Failed to exchange authorization code for tokens');
}
}
// Refresh access token
async refreshAccessToken(refreshToken) {
try {
const response = await axios.post(this.tokenEndpoint,
new URLSearchParams({
grant_type: 'refresh_token',
refresh_token: refreshToken,
client_id: this.clientId,
client_secret: this.clientSecret
}),
{
headers: {
'Content-Type': 'application/x-www-form-urlencoded',
'Accept': 'application/json'
}
}
);
return {
accessToken: response.data.access_token,
refreshToken: response.data.refresh_token || refreshToken,
expiresIn: response.data.expires_in
};
} catch (error) {
console.error('Token refresh failed:', error.response?.data || error.message);
throw new Error('Failed to refresh access token');
}
}
// Revoke token
async revokeToken(token, tokenTypeHint = 'access_token') {
try {
await axios.post(`${this.authServer}/revoke`,
new URLSearchParams({
token: token,
token_type_hint: tokenTypeHint,
client_id: this.clientId,
client_secret: this.clientSecret
}),
{
headers: {
'Content-Type': 'application/x-www-form-urlencoded'
}
}
);
return true;
} catch (error) {
console.error('Token revocation failed:', error.response?.data || error.message);
return false;
}
}
// Validate token (introspection)
async validateToken(token) {
try {
const response = await axios.post(`${this.authServer}/introspect`,
new URLSearchParams({
token: token,
client_id: this.clientId,
client_secret: this.clientSecret
}),
{
headers: {
'Content-Type': 'application/x-www-form-urlencoded'
}
}
);
return {
active: response.data.active,
scope: response.data.scope,
clientId: response.data.client_id,
username: response.data.username,
expiresAt: response.data.exp,
issuedAt: response.data.iat
};
} catch (error) {
console.error('Token validation failed:', error.response?.data || error.message);
return { active: false };
}
}
}
// Usage Example
const oauth2Config = {
clientId: 'your-client-id',
clientSecret: 'your-client-secret',
redirectUri: 'https://your-app.com/callback',
authServer: 'https://auth.example.com',
tokenEndpoint: 'https://auth.example.com/oauth/token',
scope: 'read write profile'
};
const oauth2Client = new OAuth2Client(oauth2Config);
// Example: Authorization flow
async function initiateAuthorization() {
const state = crypto.randomBytes(16).toString('hex');
const { verifier, challenge } = oauth2Client.generatePKCE();
// Store state and verifier in session for later verification
sessionStorage.setItem('oauth_state', state);
sessionStorage.setItem('code_verifier', verifier);
const authUrl = oauth2Client.buildAuthorizationUrl(state, challenge);
// Redirect user to authorization server
console.log('Redirect to:', authUrl);
// window.location.href = authUrl;
}
// Example: Handle callback
async function handleCallback(code, state, receivedState) {
const storedState = sessionStorage.getItem('oauth_state');
const codeVerifier = sessionStorage.getItem('code_verifier');
if (receivedState !== storedState) {
throw new Error('Invalid state parameter - possible CSRF attack');
}
const tokens = await oauth2Client.exchangeCodeForTokens(code, codeVerifier, state);
// Store tokens securely
localStorage.setItem('access_token', tokens.accessToken);
localStorage.setItem('refresh_token', tokens.refreshToken);
sessionStorage.removeItem('oauth_state');
sessionStorage.removeItem('code_verifier');
return tokens;
}
// Example: API request with token
async function makeAuthenticatedRequest(url, accessToken) {
try {
const response = await axios.get(url, {
headers: {
'Authorization': `Bearer ${accessToken}`,
'Accept': 'application/json'
}
});
return response.data;
} catch (error) {
if (error.response?.status === 401) {
// Token expired, try to refresh
const refreshToken = localStorage.getItem('refresh_token');
if (refreshToken) {
const newTokens = await oauth2Client.refreshAccessToken(refreshToken);
localStorage.setItem('access_token', newTokens.accessToken);
localStorage.setItem('refresh_token', newTokens.refreshToken);
// Retry request with new token
return makeAuthenticatedRequest(url, newTokens.accessToken);
}
}
throw error;
}
}
// Export for use in other modules
module.exports = { OAuth2Client };OAuth 2.0 Client Credentials Flow
Machine-to-machine authentication using Client Credentials Grant
Difficulty
4/10
Estimated time
20 min
Tags
oauth2, python, authentication, api
Prerequisites
Python basics, HTTP requests, JSON
#!/usr/bin/env python3
"""
OAuth 2.0 Client Credentials Flow Implementation
Used for machine-to-machine authentication where no user interaction is required.
"""
import base64
import secrets
import time
import requests
from typing import Dict, Optional, Any
from urllib.parse import urlencode
class OAuth2ClientCredentials:
"""OAuth 2.0 Client Credentials Grant implementation"""
def __init__(
self,
client_id: str,
client_secret: str,
token_endpoint: str,
scope: Optional[str] = None
):
self.client_id = client_id
self.client_secret = client_secret
self.token_endpoint = token_endpoint
self.scope = scope
self._access_token = None
self._token_expires_at = 0
self._refresh_token = None
def _get_basic_auth_header(self) -> str:
"""Create Basic Authentication header"""
credentials = f"{self.client_id}:{self.client_secret}"
encoded_credentials = base64.b64encode(credentials.encode()).decode()
return f"Basic {encoded_credentials}"
def _is_token_expired(self) -> bool:
"""Check if the current token is expired or will expire soon"""
return time.time() >= (self._token_expires_at - 60) # 60-second buffer
def get_access_token(self, force_refresh: bool = False) -> str:
"""
Get a valid access token, refreshing if necessary
Args:
force_refresh: Force token refresh even if current token is valid
Returns:
Valid access token
"""
if not force_refresh and self._access_token and not self._is_token_expired():
return self._access_token
return self._fetch_new_token()
def _fetch_new_token(self) -> str:
"""Fetch a new access token from the authorization server"""
token_data = {
'grant_type': 'client_credentials'
}
if self.scope:
token_data['scope'] = self.scope
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'Authorization': self._get_basic_auth_header()
}
try:
response = requests.post(
self.token_endpoint,
data=token_data,
headers=headers
)
response.raise_for_status()
token_info = response.json()
self._access_token = token_info['access_token']
# Set token expiration (default to 1 hour if not specified)
expires_in = token_info.get('expires_in', 3600)
self._token_expires_at = time.time() + expires_in
# Store additional token info
self._token_type = token_info.get('token_type', 'Bearer')
self._token_scope = token_info.get('scope', self.scope)
print(f"Successfully obtained new access token. Expires in {expires_in} seconds.")
return self._access_token
except requests.exceptions.RequestException as e:
print(f"Error fetching token: {e}")
if hasattr(e, 'response') and e.response is not None:
print(f"Response content: {e.response.text}")
raise
def make_authenticated_request(
self,
method: str,
url: str,
**kwargs
) -> requests.Response:
"""
Make an authenticated API request
Args:
method: HTTP method (GET, POST, PUT, DELETE, etc.)
url: Request URL
**kwargs: Additional arguments passed to requests
Returns:
requests.Response object
"""
# Ensure we have a valid token
token = self.get_access_token()
# Add authorization header
headers = kwargs.pop('headers', {})
headers['Authorization'] = f"Bearer {token}"
# Make the request
response = requests.request(method, url, headers=headers, **kwargs)
# If we get a 401, try refreshing the token once
if response.status_code == 401:
print("Token expired, refreshing...")
token = self.get_access_token(force_refresh=True)
headers['Authorization'] = f"Bearer {token}"
response = requests.request(method, url, headers=headers, **kwargs)
response.raise_for_status()
return response
def get(self, url: str, **kwargs) -> requests.Response:
"""Make authenticated GET request"""
return self.make_authenticated_request('GET', url, **kwargs)
def post(self, url: str, **kwargs) -> requests.Response:
"""Make authenticated POST request"""
return self.make_authenticated_request('POST', url, **kwargs)
def put(self, url: str, **kwargs) -> requests.Response:
"""Make authenticated PUT request"""
return self.make_authenticated_request('PUT', url, **kwargs)
def delete(self, url: str, **kwargs) -> requests.Response:
"""Make authenticated DELETE request"""
return self.make_authenticated_request('DELETE', url, **kwargs)
def revoke_token(self) -> bool:
"""Revoke the current access token"""
if not self._access_token:
return True
try:
revoke_data = {
'token': self._access_token,
'token_type_hint': 'access_token'
}
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'Authorization': self._get_basic_auth_header()
}
revoke_endpoint = self.token_endpoint.replace('/token', '/revoke')
response = requests.post(revoke_endpoint, data=revoke_data, headers=headers)
response.raise_for_status()
# Clear stored token
self._access_token = None
self._token_expires_at = 0
print("Token successfully revoked")
return True
except requests.exceptions.RequestException as e:
print(f"Error revoking token: {e}")
return False
# Example usage
if __name__ == "__main__":
# Configuration
config = {
"client_id": "your-client-id",
"client_secret": "your-client-secret",
"token_endpoint": "https://auth.example.com/oauth/token",
"scope": "api:read api:write"
}
# Initialize OAuth2 client
client = OAuth2ClientCredentials(**config)
try:
# Get access token
token = client.get_access_token()
print(f"Access token: {token[:20]}...")
# Make authenticated API requests
api_base_url = "https://api.example.com"
# GET request
response = client.get(f"{api_base_url}/users")
users = response.json()
print(f"Found {len(users)} users")
# POST request
new_user = {
"name": "John Doe",
"email": "[email protected]"
}
response = client.post(f"{api_base_url}/users", json=new_user)
created_user = response.json()
print(f"Created user: {created_user['id']}")
# PUT request
updated_user = {**created_user, "name": "John Smith"}
response = client.put(f"{api_base_url}/users/{created_user['id']}", json=updated_user)
print("User updated successfully")
# DELETE request
response = client.delete(f"{api_base_url}/users/{created_user['id']}")
print("User deleted successfully")
# Revoke token when done
client.revoke_token()
except Exception as e:
print(f"Error: {e}")OAuth 2.0 Resource Server
Implementation of a protected API resource server that validates OAuth 2.0 tokens
Difficulty
8/10
Estimated time
45 min
Tags
oauth2, security, typescript, api, middleware
Prerequisites
Node.js/Express, JWT, TypeScript, OAuth 2.0 concepts
// OAuth 2.0 Resource Server Implementation
// Protects API endpoints with OAuth 2.0 token validation
import express from 'express';
import jwt from 'jsonwebtoken';
import axios from 'axios';
import rateLimit from 'express-rate-limit';
// Configuration interface
interface OAuth2Config {
issuer: string;
audience: string;
introspectionEndpoint: string;
jwksUri: string;
requiredScope?: string[];
}
// Token information interface
interface TokenInfo {
active: boolean;
scope?: string;
clientId?: string;
username?: string;
expiresAt?: number;
issuedAt?: number;
sub?: string;
aud?: string[];
iss?: string;
}
// JWT Key Store
class JWKSKeyStore {
private keys: Map<string, any> = new Map();
private lastRefresh: number = 0;
private refreshInterval: number = 3600000; // 1 hour
constructor(private jwksUri: string) {}
async getKey(kid: string): Promise<any> {
if (Date.now() - this.lastRefresh > this.refreshInterval) {
await this.refreshKeys();
}
const key = this.keys.get(kid);
if (!key) {
await this.refreshKeys();
return this.keys.get(kid);
}
return key;
}
private async refreshKeys(): Promise<void> {
try {
const response = await axios.get(this.jwksUri);
const jwks = response.data;
this.keys.clear();
for (const jwk of jwks.keys) {
this.keys.set(jwk.kid, jwk);
}
this.lastRefresh = Date.now();
console.log('JWKS keys refreshed successfully');
} catch (error) {
console.error('Failed to refresh JWKS keys:', error);
throw new Error('Unable to refresh JWKS keys');
}
}
}
// OAuth 2.0 Authentication Middleware
class OAuth2Middleware {
private keyStore: JWKSKeyStore;
constructor(private config: OAuth2Config) {
this.keyStore = new JWKSKeyStore(config.jwksUri);
}
// Main authentication middleware
authenticate() {
return async (req: express.Request, res: express.Response, next: express.NextFunction) => {
try {
const token = this.extractToken(req);
if (!token) {
return res.status(401).json({ error: 'Missing access token' });
}
const tokenInfo = await this.validateToken(token);
if (!tokenInfo.active) {
return res.status(401).json({ error: 'Invalid or expired token' });
}
if (!this.validateAudience(tokenInfo)) {
return res.status(403).json({ error: 'Invalid audience' });
}
if (!this.validateScope(tokenInfo)) {
return res.status(403).json({ error: 'Insufficient scope' });
}
// Attach token info to request
(req as any).user = {
sub: tokenInfo.sub,
username: tokenInfo.username,
clientId: tokenInfo.clientId,
scope: tokenInfo.scope?.split(' ') || []
};
next();
} catch (error) {
console.error('Authentication error:', error);
res.status(401).json({ error: 'Authentication failed' });
}
};
}
// Scope-based authorization middleware
requireScope(requiredScopes: string[]) {
return (req: express.Request, res: express.Response, next: express.NextFunction) => {
const user = (req as any).user;
if (!user || !user.scope) {
return res.status(403).json({ error: 'No scope information available' });
}
const hasRequiredScope = requiredScopes.every(scope =>
user.scope.includes(scope)
);
if (!hasRequiredScope) {
return res.status(403).json({
error: 'Insufficient scope',
required: requiredScopes,
provided: user.scope
});
}
next();
};
}
private extractToken(req: express.Request): string | null {
const authHeader = req.headers.authorization;
if (!authHeader) {
return null;
}
const parts = authHeader.split(' ');
if (parts.length !== 2 || parts[0] !== 'Bearer') {
return null;
}
return parts[1];
}
private async validateToken(token: string): Promise<TokenInfo> {
try {
// Try to validate as JWT first
const decoded = jwt.decode(token, { complete: true });
if (decoded && typeof decoded === 'object' && decoded.header.kid) {
return await this.validateJWT(token, decoded);
} else {
// Fallback to introspection
return await this.introspectToken(token);
}
} catch (error) {
// If JWT validation fails, try introspection
return await this.introspectToken(token);
}
}
private async validateJWT(token: string, decoded: any): Promise<TokenInfo> {
const header = decoded.header;
const payload = decoded.payload;
// Get signing key
const key = await this.keyStore.getKey(header.kid);
if (!key) {
throw new Error('Signing key not found');
}
// Verify JWT signature and claims
const verified = jwt.verify(token, this.getPublicKey(key), {
issuer: this.config.issuer,
audience: this.config.audience
});
return {
active: true,
sub: verified.sub,
scope: verified.scope,
clientId: verified.client_id || verified.azp,
username: verified.preferred_username || verified.username,
expiresAt: verified.exp,
issuedAt: verified.iat,
aud: verified.aud,
iss: verified.iss
};
}
private async introspectToken(token: string): Promise<TokenInfo> {
// Note: This requires client credentials for the resource server
// In a real implementation, these should be stored securely
const response = await axios.post(this.config.introspectionEndpoint,
new URLSearchParams({
token: token,
token_type_hint: 'access_token'
}),
{
auth: {
username: process.env.RESOURCE_SERVER_CLIENT_ID || '',
password: process.env.RESOURCE_SERVER_CLIENT_SECRET || ''
},
headers: {
'Content-Type': 'application/x-www-form-urlencoded'
}
}
);
const data = response.data;
return {
active: data.active,
sub: data.sub,
scope: data.scope,
clientId: data.client_id,
username: data.username,
expiresAt: data.exp,
issuedAt: data.iat,
aud: data.aud,
iss: data.iss
};
}
private validateAudience(tokenInfo: TokenInfo): boolean {
if (!tokenInfo.aud) return false;
const audiences = Array.isArray(tokenInfo.aud) ? tokenInfo.aud : [tokenInfo.aud];
return audiences.includes(this.config.audience);
}
private validateScope(tokenInfo: TokenInfo): boolean {
if (!this.config.requiredScope || this.config.requiredScope.length === 0) {
return true;
}
if (!tokenInfo.scope) return false;
const tokenScopes = tokenInfo.scope.split(' ');
return this.config.requiredScope.every(required =>
tokenScopes.includes(required)
);
}
private getPublicKey(jwk: any): string {
// Convert JWK to PEM format
// This is a simplified version - in production, use a proper library like 'jwk-to-pem'
const modulus = Buffer.from(jwk.n, 'base64url').toString('base64');
const exponent = Buffer.from(jwk.e, 'base64url').toString('base64');
const body = [
'-----BEGIN PUBLIC KEY-----',
...this.chunkString(`MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA${modulus}${exponent}`, 64),
'-----END PUBLIC KEY-----'
];
return body.join('\n');
}
private chunkString(str: string, size: number): string[] {
const chunks: string[] = [];
for (let i = 0; i < str.length; i += size) {
chunks.push(str.slice(i, i + size));
}
return chunks;
}
}
// Example API Server Setup
function createResourceServer(): express.Application {
const app = express();
// Configuration
const oauth2Config: OAuth2Config = {
issuer: process.env.OAUTH2_ISSUER || 'https://auth.example.com',
audience: process.env.OAUTH2_AUDIENCE || 'api://my-resource-server',
introspectionEndpoint: process.env.OAUTH2_INTROSPECTION_ENDPOINT || 'https://auth.example.com/introspect',
jwksUri: process.env.OAUTH2_JWKS_URI || 'https://auth.example.com/.well-known/jwks.json',
requiredScope: ['api:read']
};
const oauth2 = new OAuth2Middleware(oauth2Config);
// Middleware
app.use(express.json());
// Rate limiting
const limiter = rateLimit({
windowMs: 15 * 60 * 1000, // 15 minutes
max: 100 // limit each IP to 100 requests per windowMs
});
app.use(limiter);
// Health check (no auth required)
app.get('/health', (req, res) => {
res.json({ status: 'healthy', timestamp: new Date().toISOString() });
});
// Protected routes
const apiRouter = express.Router();
apiRouter.use(oauth2.authenticate());
// User profile - requires authentication
apiRouter.get('/profile', (req, res) => {
const user = (req as any).user;
res.json({
sub: user.sub,
username: user.username,
clientId: user.clientId,
scopes: user.scope
});
});
// Read users - requires api:read scope
apiRouter.get('/users', oauth2.requireScope(['api:read']), (req, res) => {
// Mock data
const users = [
{ id: 1, name: 'John Doe', email: '[email protected]' },
{ id: 2, name: 'Jane Smith', email: '[email protected]' }
];
res.json(users);
});
// Create user - requires api:write scope
apiRouter.post('/users',
oauth2.requireScope(['api:write']),
(req, res) => {
const { name, email } = req.body;
if (!name || !email) {
return res.status(400).json({ error: 'Name and email are required' });
}
// Mock user creation
const newUser = {
id: Date.now(),
name,
email,
createdAt: new Date().toISOString()
};
res.status(201).json(newUser);
}
);
// Admin endpoint - requires admin scope
apiRouter.get('/admin/stats',
oauth2.requireScope(['api:admin']),
(req, res) => {
res.json({
totalUsers: 1000,
activeUsers: 850,
totalRequests: 50000,
uptime: '99.9%'
});
}
);
app.use('/api', apiRouter);
// Error handling
app.use((err: Error, req: express.Request, res: express.Response, next: express.NextFunction) => {
console.error('Unhandled error:', err);
res.status(500).json({ error: 'Internal server error' });
});
return app;
}
// Start server
if (require.main === module) {
const port = process.env.PORT || 3000;
const app = createResourceServer();
app.listen(port, () => {
console.log(`Resource server listening on port ${port}`);
});
}
export { OAuth2Middleware, createResourceServer, OAuth2Config };Tools
Tools frequently paired with this sample
Related