Streamlit 示例
Python数据应用框架示例,用最少的代码为数据科学和机器学习创建交互式Web应用
💻 Hello World 基础 python
🟢 simple
Streamlit应用快速入门示例
# Streamlit Hello World Examples
# Quick start with basic Streamlit features
import streamlit as st
import pandas as pd
import numpy as np
import time
import random
from datetime import datetime, timedelta
# 1. Basic Streamlit App Structure
def basic_app():
"""Simple hello world app"""
st.set_page_config(
page_title="Hello World",
page_icon="👋",
layout="centered"
)
st.title("👋 Hello, Streamlit!")
st.write("Welcome to your first Streamlit application")
# Basic input widgets
name = st.text_input("What's your name?")
if name:
st.success(f"Hello, {name}! 👋")
age = st.slider("Your age", 0, 100, 25)
st.write(f"You are {age} years old")
# Button interaction
if st.button("Click me!"):
st.balloons()
st.write("🎉 Button clicked!")
# 2. Interactive Elements App
def interactive_app():
"""App with various interactive elements"""
st.set_page_config(
page_title="Interactive Elements",
page_icon="🎮",
layout="wide"
)
st.header("🎮 Interactive Elements Demo")
# Sidebar for controls
st.sidebar.title("Control Panel")
theme = st.sidebar.selectbox("Choose theme", ["Light", "Dark"])
show_charts = st.sidebar.checkbox("Show charts", value=True)
num_points = st.sidebar.slider("Number of data points", 10, 100, 50)
# Main content
col1, col2 = st.columns(2)
with col1:
st.subheader("User Input")
# Text inputs
user_text = st.text_area("Enter some text", "Type here...")
word_count = len(user_text.split()) if user_text else 0
st.write(f"Word count: {word_count}")
# Selectbox and multiselect
fruit = st.selectbox("Favorite fruit", ["Apple", "Banana", "Orange", "Grape"])
colors = st.multiselect("Choose colors", ["Red", "Blue", "Green", "Yellow"])
# Date and time
selected_date = st.date_input("Select a date")
selected_time = st.time_input("Select a time")
with col2:
st.subheader("Your Selections")
st.write(f"**Theme:** {theme}")
st.write(f"**Fruit:** {fruit}")
st.write(f"**Colors:** {', '.join(colors) if colors else 'None'}")
st.write(f"**Date:** {selected_date}")
st.write(f"**Time:** {selected_time}")
# Color display
if colors:
for color in colors:
st.markdown(f'<div style="background-color: {color.lower()}; padding: 10px; margin: 5px; border-radius: 5px;">{color}</div>', unsafe_allow_html=True)
# Charts section
if show_charts:
st.subheader("📊 Sample Data Visualization")
generate_sample_charts(num_points)
# 3. Data Processing App
def data_processing_app():
"""App for basic data processing"""
st.set_page_config(
page_title="Data Processing",
page_icon="📊",
layout="wide"
)
st.header("📊 Data Processing Demo")
# File upload
uploaded_file = st.file_uploader(
"Choose a CSV file",
type=["csv"],
help="Upload a CSV file to analyze"
)
if uploaded_file is not None:
# Load data
data = pd.read_csv(uploaded_file)
st.success(f"File loaded successfully! Shape: {data.shape}")
# Data info
st.subheader("📋 Data Information")
col1, col2, col3 = st.columns(3)
with col1:
st.metric("Rows", data.shape[0])
with col2:
st.metric("Columns", data.shape[1])
with col3:
st.metric("Missing Values", data.isnull().sum().sum())
# Data preview
st.subheader("👀 Data Preview")
show_rows = st.slider("Number of rows to show", 5, 50, 10)
st.dataframe(data.head(show_rows))
# Column operations
if st.checkbox("Show column operations"):
selected_column = st.selectbox("Select column", data.columns)
if selected_column:
st.subheader(f"📈 Analysis for {selected_column}")
# Basic statistics
if data[selected_column].dtype in ['int64', 'float64']:
st.write("**Statistics:**")
st.write(data[selected_column].describe())
# Histogram
st.bar_chart(data[selected_column].value_counts())
else:
st.write("**Value Counts:**")
st.write(data[selected_column].value_counts())
else:
st.info("Please upload a CSV file to get started")
# Generate sample data
if st.button("Generate sample data"):
sample_data = generate_sample_data()
st.write("Sample data generated:")
st.dataframe(sample_data)
# 4. Machine Learning Demo
def ml_demo_app():
"""Simple machine learning demo"""
st.set_page_config(
page_title="ML Demo",
page_icon="🤖",
layout="wide"
)
st.header("🤖 Simple Machine Learning Demo")
# Model selection
model_type = st.selectbox(
"Choose model type",
["Linear Regression", "Classification", "Clustering"]
)
if model_type == "Linear Regression":
linear_regression_demo()
elif model_type == "Classification":
classification_demo()
else:
clustering_demo()
def linear_regression_demo():
"""Linear regression demonstration"""
st.subheader("📈 Linear Regression")
# Generate synthetic data
np.random.seed(42)
X = np.random.rand(100, 1) * 10
y = 2 * X + 1 + np.random.randn(100, 1) * 0.5
# Create dataframe
df = pd.DataFrame({'X': X.flatten(), 'y': y.flatten()})
# Simple linear regression (using numpy)
X_b = np.c_[np.ones((100, 1)), X] # add x0 = 1
theta_best = np.linalg.inv(X_b.T.dot(X_b)).dot(X_b.T.dot(y))
# Make predictions
X_new = np.array([[0], [10]])
X_new_b = np.c_[np.ones((2, 1)), X_new]
y_predict = X_new_b.dot(theta_best)
# Display results
col1, col2 = st.columns(2)
with col1:
st.write("**Training Data:**")
st.dataframe(df.head(10))
st.write(f"**Model Parameters:**")
st.write(f"Intercept: {theta_best[0][0]:.3f}")
st.write(f"Slope: {theta_best[1][0]:.3f}")
with col2:
st.write("**Predictions:**")
for i, (x_val, y_val) in enumerate(zip(X_new.flatten(), y_predict.flatten())):
st.write(f"X = {x_val:.1f} → y = {y_val:.2f}")
# Chart
st.write("**Regression Line:**")
chart_data = pd.DataFrame({
'X': X.flatten(),
'Actual': y.flatten(),
'Predicted': np.dot(X_b, theta_best).flatten()
})
st.line_chart(chart_data.set_index('X'))
def classification_demo():
"""Simple classification demonstration"""
st.subheader("🎯 Classification Demo")
# Generate sample data
np.random.seed(42)
n_samples = 200
# Two classes
class_0 = np.random.multivariate_normal([2, 2], [[1, 0.5], [0.5, 1]], n_samples//2)
class_1 = np.random.multivariate_normal([4, 4], [[1, -0.5], [-0.5, 1]], n_samples//2)
X = np.vstack([class_0, class_1])
y = np.hstack([np.zeros(n_samples//2), np.ones(n_samples//2)])
# Create dataframe
df = pd.DataFrame({'Feature 1': X[:, 0], 'Feature 2': X[:, 1], 'Class': y})
# Display data
col1, col2 = st.columns(2)
with col1:
st.write("**Sample Data:**")
st.dataframe(df.head(10))
# Simple rule-based classifier
def simple_classifier(x1, x2):
return 1 if (x1 + x2) > 6 else 0
# Test classifier
test_x1 = st.slider("Feature 1 value", 0, 6, 3)
test_x2 = st.slider("Feature 2 value", 0, 6, 3)
prediction = simple_classifier(test_x1, test_x2)
st.write(f"**Prediction:** {'Class 1' if prediction == 1 else 'Class 0'}")
with col2:
st.write("**Class Distribution:**")
class_counts = df['Class'].value_counts()
st.write(f"Class 0: {class_counts[0]} samples")
st.write(f"Class 1: {class_counts[1]} samples")
# Simple scatter plot
st.write("**Data Visualization:**")
chart_data = df.groupby('Class').size()
st.bar_chart(chart_data)
def clustering_demo():
"""Simple clustering demonstration"""
st.subheader("🔗 Clustering Demo")
# Generate sample data
np.random.seed(42)
centers = [[1, 1], [5, 5], [3, 7]]
X = []
labels = []
for i, center in enumerate(centers):
cluster_points = np.random.randn(30, 2) + center
X.extend(cluster_points)
labels.extend([f'Cluster {i+1}'] * 30)
df = pd.DataFrame({
'X': [point[0] for point in X],
'Y': [point[1] for point in X],
'Cluster': labels
})
# Display data
st.write("**Cluster Data:**")
st.dataframe(df.head(15))
# Cluster statistics
st.write("**Cluster Statistics:**")
cluster_stats = df.groupby('Cluster').agg({
'X': ['mean', 'std'],
'Y': ['mean', 'std'],
'Cluster': 'count'
}).round(3)
st.dataframe(cluster_stats)
# Visualization
st.write("**Cluster Centers:**")
for i, center in enumerate(centers):
st.metric(f"Cluster {i+1}", f"({center[0]:.1f}, {center[1]:.1f})")
# 5. Real-time Updates Demo
def realtime_demo():
"""Real-time updating demo"""
st.set_page_config(
page_title="Real-time Demo",
page_icon="⏱️",
layout="centered"
)
st.header("⏱️ Real-time Updates Demo")
# Placeholder for dynamic content
placeholder = st.empty()
# Metrics placeholder
metrics_placeholder = st.empty()
# Button to start/stop real-time updates
if 'running' not in st.session_state:
st.session_state.running = False
if st.button("Start Real-time Updates"):
st.session_state.running = True
if st.button("Stop Real-time Updates"):
st.session_state.running = False
# Real-time updates
if st.session_state.running:
for i in range(20): # Run for 20 iterations
# Generate random data
current_time = datetime.now().strftime("%H:%M:%S")
value = random.randint(1, 100)
# Update placeholder
with placeholder.container():
st.subheader(f"Live Update - {current_time}")
# Create progress bar
progress = i / 20
st.progress(progress)
# Show random chart
chart_data = pd.DataFrame({
'Time': [f"T{j}" for j in range(i+1)],
'Value': [random.randint(50, 150) for _ in range(i+1)]
})
st.line_chart(chart_data.set_index('Time'))
# Update metrics
with metrics_placeholder.container():
col1, col2, col3 = st.columns(3)
col1.metric("Current Value", value, f"{value - 50}")
col2.metric("Progress", f"{progress:.1%}")
col3.metric("Time Elapsed", f"{i}s")
time.sleep(1)
# Check if should stop
if not st.session_state.running:
break
st.session_state.running = False
st.success("Real-time updates completed!")
# 6. Utility Functions
def generate_sample_charts(num_points):
"""Generate sample charts"""
# Generate random data
dates = pd.date_range(end=datetime.now(), periods=num_points)
data = pd.DataFrame({
'date': dates,
'sales': np.random.normal(1000, 200, num_points) + np.sin(range(num_points)) * 100,
'customers': np.random.poisson(50, num_points),
'revenue': np.random.normal(5000, 1000, num_points)
})
# Line chart
st.subheader("📈 Sales Trend")
st.line_chart(data.set_index('date')['sales'])
# Bar chart
st.subheader("👥 Customer Count")
st.bar_chart(data.set_index('date')['customers'])
# Area chart
st.subheader("💰 Revenue")
st.area_chart(data.set_index('date')['revenue'])
def generate_sample_data():
"""Generate sample CSV data"""
np.random.seed(42)
dates = pd.date_range(end=datetime.now(), periods=100)
data = {
'date': dates,
'product': np.random.choice(['A', 'B', 'C', 'D'], 100),
'sales': np.random.normal(1000, 200, 100).astype(int),
'quantity': np.random.poisson(20, 100),
'price': np.random.uniform(10, 100, 100).round(2)
}
return pd.DataFrame(data)
# 7. Main App Selection
def main():
"""Main application selector"""
st.set_page_config(
page_title="Streamlit Examples",
page_icon="🎯",
layout="centered",
initial_sidebar_state="expanded"
)
# App selection in sidebar
st.sidebar.title("📚 Streamlit Examples")
app_choice = st.sidebar.selectbox(
"Choose an example:",
[
"Hello World",
"Interactive Elements",
"Data Processing",
"Machine Learning Demo",
"Real-time Updates"
]
)
# Run selected app
if app_choice == "Hello World":
basic_app()
elif app_choice == "Interactive Elements":
interactive_app()
elif app_choice == "Data Processing":
data_processing_app()
elif app_choice == "Machine Learning Demo":
ml_demo_app()
elif app_choice == "Real-time Updates":
realtime_demo()
# Footer
st.sidebar.markdown("---")
st.sidebar.markdown("### About")
st.sidebar.info("This is a collection of Streamlit examples demonstrating various features and capabilities.")
if __name__ == "__main__":
main()
💻 数据仪表板 python
🟡 intermediate
创建交互式数据可视化仪表板
# Interactive Data Dashboard with Streamlit
# Complete business intelligence dashboard
import streamlit as st
import pandas as pd
import numpy as np
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import datetime
from datetime import timedelta
import random
# 1. Page Configuration
st.set_page_config(
page_title="Business Intelligence Dashboard",
page_icon="📊",
layout="wide",
initial_sidebar_state="expanded"
)
# 2. Data Generation Functions
def generate_sales_data(start_date, end_date):
"""Generate realistic sales data"""
date_range = pd.date_range(start=start_date, end=end_date)
data = []
for date in date_range:
# Weekend lower sales
weekday_factor = 0.7 if date.weekday() >= 5 else 1.0
# Seasonal variation
day_of_year = date.timetuple().tm_yday
seasonal_factor = 1 + 0.3 * np.sin(2 * np.pi * day_of_year / 365)
for product in ['Electronics', 'Clothing', 'Food', 'Books', 'Home']:
base_sales = {
'Electronics': np.random.normal(5000, 1000),
'Clothing': np.random.normal(3000, 500),
'Food': np.random.normal(2000, 300),
'Books': np.random.normal(1500, 200),
'Home': np.random.normal(2500, 400)
}[product]
# Apply factors
sales = base_sales * weekday_factor * seasonal_factor * np.random.uniform(0.8, 1.2)
customers = int(sales / 50) + np.random.poisson(10)
data.append({
'date': date,
'product': product,
'sales': max(0, sales),
'customers': max(0, customers),
'region': np.random.choice(['North', 'South', 'East', 'West']),
'channel': np.random.choice(['Online', 'Store', 'Phone'])
})
return pd.DataFrame(data)
def generate_customer_data():
"""Generate customer analytics data"""
segments = ['New', 'Returning', 'VIP', 'Churned']
data = []
for segment in segments:
for _ in range(100):
data.append({
'customer_id': f"C_{random.randint(10000, 99999)}",
'segment': segment,
'lifetime_value': np.random.lognormal(8, 1) * (3 if segment == 'VIP' else 1),
'orders': np.random.poisson(10 if segment != 'New' else 1),
'avg_order_value': np.random.normal(100, 30) * (1.5 if segment == 'VIP' else 1),
'days_since_last_order': np.random.exponential(30) * (2 if segment == 'Churned' else 0.5),
'satisfaction_score': np.random.uniform(3, 5) * (1.2 if segment == 'VIP' else 1)
})
return pd.DataFrame(data)
# 3. Dashboard Components
def create_kpi_metrics(df):
"""Create KPI metrics display"""
# Calculate metrics
total_sales = df['sales'].sum()
total_customers = df['customers'].sum()
avg_order_value = df['sales'].sum() / df['customers'].sum() if df['customers'].sum() > 0 else 0
# Calculate growth (mock data for demo)
sales_growth = np.random.uniform(-10, 25)
customer_growth = np.random.uniform(-5, 15)
aov_growth = np.random.uniform(-8, 12)
# Display metrics
col1, col2, col3, col4 = st.columns(4)
with col1:
st.metric(
label="Total Sales",
value=f"${total_sales:,.0f}",
delta=f"{sales_growth:.1f}% vs last period"
)
with col2:
st.metric(
label="Total Customers",
value=f"{total_customers:,.0f}",
delta=f"{customer_growth:.1f}% vs last period"
)
with col3:
st.metric(
label="Avg Order Value",
value=f"${avg_order_value:.2f}",
delta=f"{aov_growth:.1f}% vs last period"
)
with col4:
top_product = df.groupby('product')['sales'].sum().idxmax()
st.metric(
label="Top Product",
value=top_product,
delta="Best performer"
)
def create_sales_charts(df):
"""Create sales visualization charts"""
st.subheader("📈 Sales Performance")
# Time series chart
daily_sales = df.groupby('date')['sales'].sum().reset_index()
fig_time = px.line(
daily_sales,
x='date',
y='sales',
title='Sales Trend Over Time',
labels={'sales': 'Sales ($)', 'date': 'Date'}
)
fig_time.add_scatter(
x=daily_sales['date'],
y=daily_sales['sales'].rolling(window=7).mean(),
mode='lines',
name='7-day Moving Average',
line=dict(dash='dash')
)
st.plotly_chart(fig_time, use_container_width=True)
# Product performance
col1, col2 = st.columns(2)
with col1:
product_sales = df.groupby('product')['sales'].sum().reset_index()
fig_product = px.bar(
product_sales,
x='product',
y='sales',
title='Sales by Product Category',
labels={'sales': 'Sales ($)', 'product': 'Product'}
)
st.plotly_chart(fig_product, use_container_width=True)
with col2:
region_sales = df.groupby('region')['sales'].sum().reset_index()
fig_region = px.pie(
region_sales,
values='sales',
names='region',
title='Sales by Region'
)
st.plotly_chart(fig_region, use_container_width=True)
def create_customer_analytics(customer_df):
"""Create customer analytics visualizations"""
st.subheader("👥 Customer Analytics")
col1, col2 = st.columns(2)
with col1:
# Customer segments
segment_counts = customer_df['segment'].value_counts()
fig_segment = px.bar(
x=segment_counts.index,
y=segment_counts.values,
title='Customer Segments Distribution',
labels={'x': 'Segment', 'y': 'Number of Customers'}
)
st.plotly_chart(fig_segment, use_container_width=True)
with col2:
# Lifetime value by segment
ltv_by_segment = customer_df.groupby('segment')['lifetime_value'].mean().reset_index()
fig_ltv = px.box(
customer_df,
x='segment',
y='lifetime_value',
title='Lifetime Value by Segment',
labels={'lifetime_value': 'Lifetime Value ($)', 'segment': 'Segment'}
)
st.plotly_chart(fig_ltv, use_container_width=True)
# Customer satisfaction heatmap
st.subheader("Customer Satisfaction Analysis")
satisfaction_data = customer_df.groupby('segment').agg({
'satisfaction_score': 'mean',
'orders': 'mean',
'avg_order_value': 'mean'
}).round(2)
fig_heatmap = go.Figure(data=go.Heatmap(
z=satisfaction_data.values,
x=satisfaction_data.columns,
y=satisfaction_data.index,
colorscale='RdYlBu',
text=satisfaction_data.values,
texttemplate="%{text}",
textfont={"size": 10}
))
fig_heatmap.update_layout(
title='Customer Metrics Heatmap by Segment',
xaxis_title='Metrics',
yaxis_title='Customer Segment'
)
st.plotly_chart(fig_heatmap, use_container_width=True)
def create_performance_table(df):
"""Create detailed performance table"""
st.subheader("📋 Detailed Performance Data")
# Aggregated data
performance_data = df.groupby(['product', 'region']).agg({
'sales': ['sum', 'mean'],
'customers': 'sum'
}).round(2)
# Flatten column names
performance_data.columns = ['Total Sales', 'Avg Sales', 'Total Customers']
performance_data = performance_data.reset_index()
# Add metrics
performance_data['Avg Order Value'] = (performance_data['Total Sales'] / performance_data['Total Customers']).round(2)
# Display table
st.dataframe(
performance_data,
use_container_width=True,
hide_index=True,
column_config={
"Total Sales": st.column_config.NumberColumn("Total Sales ($)", format="$%.0f"),
"Avg Sales": st.column_config.NumberColumn("Avg Sales ($)", format="$%.2f"),
"Total Customers": st.column_config.NumberColumn("Total Customers", format="%d"),
"Avg Order Value": st.column_config.NumberColumn("Avg Order Value ($)", format="$%.2f")
}
)
def create_funnel_analysis(df):
"""Create sales funnel analysis"""
st.subheader("🔍 Sales Funnel Analysis")
# Mock funnel data (in real app, this would come from actual funnel tracking)
funnel_data = pd.DataFrame({
'Stage': ['Visitors', 'Product Views', 'Add to Cart', 'Checkout', 'Purchase'],
'Count': [10000, 3500, 1200, 800, 450],
'Conversion Rate': [100, 35, 12, 8, 4.5]
})
# Funnel chart
fig_funnel = go.Figure(go.Funnel(
y=funnel_data['Stage'],
x=funnel_data['Count'],
textinfo="value+percent initial"
))
fig_funnel.update_layout(
title="Sales Conversion Funnel",
yaxis={'categoryorder': 'array', 'categoryarray': ['Visitors', 'Product Views', 'Add to Cart', 'Checkout', 'Purchase']}
)
st.plotly_chart(fig_funnel, use_container_width=True)
# Conversion rates table
col1, col2 = st.columns(2)
with col1:
st.markdown("**Conversion Rates:**")
for _, row in funnel_data.iterrows():
st.write(f"{row['Stage']}: {row['Conversion Rate']:.1f}%")
with col2:
st.markdown("**Drop-off Analysis:**")
for i in range(1, len(funnel_data)):
drop_off = ((funnel_data.iloc[i-1]['Count'] - funnel_data.iloc[i]['Count']) / funnel_data.iloc[i-1]['Count']) * 100
st.write(f"{funnel_data.iloc[i]['Stage']}: {drop_off:.1f}% drop-off")
# 4. Main Dashboard Function
def main():
"""Main dashboard application"""
# Header
st.title("📊 Business Intelligence Dashboard")
st.markdown("Real-time business analytics and insights")
# Sidebar controls
st.sidebar.title("Dashboard Controls")
# Date range selector
today = datetime.date.today()
default_start = today - datetime.timedelta(days=30)
start_date = st.sidebar.date_input(
"Start Date",
value=default_start,
max_value=today
)
end_date = st.sidebar.date_input(
"End Date",
value=today,
max_value=today,
min_value=start_date
)
# Filters
product_filter = st.sidebar.multiselect(
"Product Filter",
options=['All', 'Electronics', 'Clothing', 'Food', 'Books', 'Home'],
default=['All']
)
region_filter = st.sidebar.multiselect(
"Region Filter",
options=['All', 'North', 'South', 'East', 'West'],
default=['All']
)
# Channel filter
channel_filter = st.sidebar.multiselect(
"Channel Filter",
options=['All', 'Online', 'Store', 'Phone'],
default=['All']
)
# Refresh button
if st.sidebar.button("🔄 Refresh Data"):
st.rerun()
# Generate or load data
@st.cache_data(ttl=300) # Cache for 5 minutes
def load_data(start, end):
sales_data = generate_sales_data(start, end)
customer_data = generate_customer_data()
return sales_data, customer_data
sales_df, customer_df = load_data(start_date, end_date)
# Apply filters
filtered_df = sales_df.copy()
if 'All' not in product_filter:
filtered_df = filtered_df[filtered_df['product'].isin(product_filter)]
if 'All' not in region_filter:
filtered_df = filtered_df[filtered_df['region'].isin(region_filter)]
if 'All' not in channel_filter:
filtered_df = filtered_df[filtered_df['channel'].isin(channel_filter)]
# Data quality check
if filtered_df.empty:
st.error("No data available for the selected filters. Please adjust your selection.")
return
# Main dashboard content
create_kpi_metrics(filtered_df)
# Tabs for different views
tab1, tab2, tab3, tab4 = st.tabs(["Sales Overview", "Customer Analytics", "Performance Table", "Funnel Analysis"])
with tab1:
create_sales_charts(filtered_df)
with tab2:
create_customer_analytics(customer_df)
with tab3:
create_performance_table(filtered_df)
with tab4:
create_funnel_analysis(filtered_df)
# Export functionality
st.sidebar.markdown("---")
st.sidebar.markdown("### Export Data")
if st.sidebar.button("📥 Export to CSV"):
csv = filtered_df.to_csv(index=False)
st.sidebar.download_button(
label="Download CSV",
data=csv,
file_name=f"dashboard_data_{datetime.date.today()}.csv",
mime="text/csv"
)
# Footer information
st.sidebar.markdown("---")
st.sidebar.markdown("### Information")
st.sidebar.info(f"""
**Data Period:** {start_date} to {end_date}
**Total Records:** {len(filtered_df):,}
**Last Updated:** {datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
""")
if __name__ == "__main__":
main()
💻 机器学习应用 python
🔴 complex
构建机器学习模型预测应用
# Machine Learning Application with Streamlit
# Complete ML pipeline with training, prediction, and evaluation
import streamlit as st
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import plotly.graph_objects as go
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
from sklearn.linear_model import LogisticRegression, LinearRegression
from sklearn.svm import SVC, SVR
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error
import io
import base64
from datetime import datetime
# 1. Page Configuration
st.set_page_config(
page_title="ML Studio",
page_icon="🤖",
layout="wide",
initial_sidebar_state="expanded"
)
# 2. Data Generation and Loading
def generate_classification_data(n_samples=1000):
"""Generate sample classification dataset"""
np.random.seed(42)
# Create synthetic data with clear patterns
n_samples_per_class = n_samples // 3
# Class 0: Young, low income, low spending
class_0 = pd.DataFrame({
'age': np.random.normal(25, 5, n_samples_per_class),
'income': np.random.normal(30000, 8000, n_samples_per_class),
'spending_score': np.random.normal(30, 10, n_samples_per_class),
'gender': np.random.choice(['Male', 'Female'], n_samples_per_class),
'target': [0] * n_samples_per_class
})
# Class 1: Middle-aged, medium income, medium spending
class_1 = pd.DataFrame({
'age': np.random.normal(40, 8, n_samples_per_class),
'income': np.random.normal(60000, 15000, n_samples_per_class),
'spending_score': np.random.normal(60, 15, n_samples_per_class),
'gender': np.random.choice(['Male', 'Female'], n_samples_per_class),
'target': [1] * n_samples_per_class
})
# Class 2: Older, high income, high spending
class_2 = pd.DataFrame({
'age': np.random.normal(55, 7, n_samples_per_class),
'income': np.random.normal(90000, 20000, n_samples_per_class),
'spending_score': np.random.normal(80, 10, n_samples_per_class),
'gender': np.random.choice(['Male', 'Female'], n_samples_per_class),
'target': [2] * n_samples_per_class
})
# Combine and shuffle
df = pd.concat([class_0, class_1, class_2], ignore_index=True)
df = df.sample(frac=1).reset_index(drop=True)
# Ensure positive values and reasonable ranges
df['age'] = np.clip(df['age'], 18, 80)
df['income'] = np.clip(df['income'], 15000, 150000)
df['spending_score'] = np.clip(df['spending_score'], 1, 100)
return df
def generate_regression_data(n_samples=1000):
"""Generate sample regression dataset"""
np.random.seed(42)
# Create synthetic data for house price prediction
sizes = np.random.normal(2000, 500, n_samples)
bedrooms = np.random.poisson(3, n_samples)
bathrooms = np.random.poisson(2, n_samples)
ages = np.random.uniform(0, 50, n_samples)
locations = np.random.choice(['Downtown', 'Suburb', 'Rural'], n_samples)
# Generate prices based on features
base_price = 50000
size_factor = sizes * 100
bedroom_factor = bedrooms * 15000
bathroom_factor = bathrooms * 10000
age_factor = -ages * 1000
location_factor = np.where(locations == 'Downtown', 100000,
np.where(locations == 'Suburb', 50000, 0))
prices = (base_price + size_factor + bedroom_factor + bathroom_factor +
age_factor + location_factor + np.random.normal(0, 25000, n_samples))
df = pd.DataFrame({
'size': np.abs(sizes),
'bedrooms': np.clip(bedrooms, 1, 6),
'bathrooms': np.clip(bathrooms, 1, 4),
'age': np.abs(ages),
'location': locations,
'price': np.abs(prices)
})
return df
# 3. Data Preprocessing
def preprocess_classification_data(df):
"""Preprocess data for classification"""
# Make a copy
df_processed = df.copy()
# Encode categorical variables
le_gender = LabelEncoder()
df_processed['gender_encoded'] = le_gender.fit_transform(df_processed['gender'])
# Feature selection
features = ['age', 'income', 'spending_score', 'gender_encoded']
X = df_processed[features]
y = df_processed['target']
return X, y, le_gender
def preprocess_regression_data(df):
"""Preprocess data for regression"""
# Make a copy
df_processed = df.copy()
# Encode categorical variables
le_location = LabelEncoder()
df_processed['location_encoded'] = le_location.fit_transform(df_processed['location'])
# Feature selection
features = ['size', 'bedrooms', 'bathrooms', 'age', 'location_encoded']
X = df_processed[features]
y = df_processed['price']
return X, y, le_location
# 4. Model Training
def train_classification_model(X, y, model_type, test_size=0.2):
"""Train classification model"""
# Split data
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=test_size, random_state=42, stratify=y
)
# Scale features
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
# Select and train model
if model_type == "Random Forest":
model = RandomForestClassifier(n_estimators=100, random_state=42)
elif model_type == "Logistic Regression":
model = LogisticRegression(random_state=42, max_iter=1000)
elif model_type == "SVM":
model = SVC(random_state=42, probability=True)
else:
raise ValueError("Unknown model type")
model.fit(X_train_scaled, y_train)
# Make predictions
y_pred = model.predict(X_test_scaled)
y_pred_proba = model.predict_proba(X_test_scaled)
# Calculate metrics
accuracy = accuracy_score(y_test, y_pred)
return {
'model': model,
'scaler': scaler,
'X_train': X_train,
'X_test': X_test,
'y_train': y_train,
'y_test': y_test,
'y_pred': y_pred,
'y_pred_proba': y_pred_proba,
'accuracy': accuracy,
'feature_names': X.columns.tolist()
}
def train_regression_model(X, y, model_type, test_size=0.2):
"""Train regression model"""
# Split data
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=test_size, random_state=42
)
# Scale features
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
# Select and train model
if model_type == "Random Forest":
model = RandomForestRegressor(n_estimators=100, random_state=42)
elif model_type == "Linear Regression":
model = LinearRegression()
elif model_type == "SVR":
model = SVR()
else:
raise ValueError("Unknown model type")
model.fit(X_train_scaled, y_train)
# Make predictions
y_pred = model.predict(X_test_scaled)
# Calculate metrics
mse = mean_squared_error(y_test, y_pred)
rmse = np.sqrt(mse)
mae = mean_absolute_error(y_test, y_pred)
r2 = r2_score(y_test, y_pred)
return {
'model': model,
'scaler': scaler,
'X_train': X_train,
'X_test': X_test,
'y_train': y_train,
'y_test': y_test,
'y_pred': y_pred,
'mse': mse,
'rmse': rmse,
'mae': mae,
'r2': r2,
'feature_names': X.columns.tolist()
}
# 5. Visualization Functions
def plot_classification_results(results):
"""Create visualizations for classification results"""
st.subheader("📊 Model Performance Visualization")
# Confusion Matrix
cm = confusion_matrix(results['y_test'], results['y_pred'])
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))
# Confusion Matrix Heatmap
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', ax=ax1)
ax1.set_title('Confusion Matrix')
ax1.set_xlabel('Predicted')
ax1.set_ylabel('Actual')
# Prediction Distribution
ax2.hist(results['y_pred'], bins=20, alpha=0.7, color='skyblue')
ax2.set_title('Prediction Distribution')
ax2.set_xlabel('Predicted Class')
ax2.set_ylabel('Frequency')
st.pyplot(fig)
# Feature Importance (if available)
if hasattr(results['model'], 'feature_importances_'):
st.subheader("🔍 Feature Importance")
feature_importance = pd.DataFrame({
'feature': results['feature_names'],
'importance': results['model'].feature_importances_
}).sort_values('importance', ascending=False)
fig_imp = px.bar(
feature_importance,
x='importance',
y='feature',
orientation='h',
title='Feature Importance'
)
st.plotly_chart(fig_imp, use_container_width=True)
def plot_regression_results(results):
"""Create visualizations for regression results"""
st.subheader("📊 Model Performance Visualization")
# Actual vs Predicted
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 10))
# Scatter plot: Actual vs Predicted
ax1.scatter(results['y_test'], results['y_pred'], alpha=0.6)
ax1.plot([results['y_test'].min(), results['y_test'].max()],
[results['y_test'].min(), results['y_test'].max()], 'r--', lw=2)
ax1.set_xlabel('Actual')
ax1.set_ylabel('Predicted')
ax1.set_title('Actual vs Predicted')
# Residual Plot
residuals = results['y_test'] - results['y_pred']
ax2.scatter(results['y_pred'], residuals, alpha=0.6)
ax2.axhline(y=0, color='r', linestyle='--')
ax2.set_xlabel('Predicted')
ax2.set_ylabel('Residuals')
ax2.set_title('Residual Plot')
# Histogram of Residuals
ax3.hist(residuals, bins=30, alpha=0.7, color='skyblue')
ax3.set_xlabel('Residuals')
ax3.set_ylabel('Frequency')
ax3.set_title('Distribution of Residuals')
# Actual vs Predicted line plot
sorted_indices = np.argsort(results['y_test'])
ax4.plot(results['y_test'].iloc[sorted_indices].values, label='Actual', alpha=0.7)
ax4.plot(results['y_pred'][sorted_indices], label='Predicted', alpha=0.7)
ax4.set_xlabel('Sample Index (sorted by actual value)')
ax4.set_ylabel('Value')
ax4.set_title('Actual vs Predicted Values')
ax4.legend()
st.pyplot(fig)
# Feature Importance (if available)
if hasattr(results['model'], 'feature_importances_'):
st.subheader("🔍 Feature Importance")
feature_importance = pd.DataFrame({
'feature': results['feature_names'],
'importance': results['model'].feature_importances_
}).sort_values('importance', ascending=False)
fig_imp = px.bar(
feature_importance,
x='importance',
y='feature',
orientation='h',
title='Feature Importance'
)
st.plotly_chart(fig_imp, use_container_width=True)
# 6. Prediction Interface
def create_prediction_interface(results, task_type):
"""Create prediction interface"""
st.subheader("🔮 Make Predictions")
if task_type == "classification":
# Classification prediction form
col1, col2 = st.columns(2)
with col1:
age = st.slider("Age", 18, 80, 35)
income = st.slider("Annual Income ($)", 15000, 150000, 50000)
with col2:
spending_score = st.slider("Spending Score", 1, 100, 50)
gender = st.selectbox("Gender", ["Male", "Female"])
# Create input dataframe
gender_encoded = 1 if gender == "Male" else 0
input_data = pd.DataFrame({
'age': [age],
'income': [income],
'spending_score': [spending_score],
'gender_encoded': [gender_encoded]
})
if st.button("Predict Customer Segment"):
# Scale input
input_scaled = results['scaler'].transform(input_data)
# Make prediction
prediction = results['model'].predict(input_scaled)[0]
probabilities = results['model'].predict_proba(input_scaled)[0]
# Display results
segment_names = ['Basic Customer', 'Regular Customer', 'Premium Customer']
st.success(f"Predicted Segment: {segment_names[prediction]}")
# Show probabilities
prob_df = pd.DataFrame({
'Segment': segment_names,
'Probability': probabilities
})
fig_prob = px.bar(
prob_df,
x='Segment',
y='Probability',
title='Prediction Probabilities'
)
st.plotly_chart(fig_prob, use_container_width=True)
else:
# Regression prediction form
col1, col2 = st.columns(2)
with col1:
size = st.slider("House Size (sq ft)", 500, 5000, 2000)
bedrooms = st.slider("Number of Bedrooms", 1, 6, 3)
with col2:
bathrooms = st.slider("Number of Bathrooms", 1, 4, 2)
age = st.slider("House Age (years)", 0, 50, 10)
location = st.selectbox("Location", ["Downtown", "Suburb", "Rural"])
# Create input dataframe
location_encoded = {'Downtown': 2, 'Suburb': 1, 'Rural': 0}[location]
input_data = pd.DataFrame({
'size': [size],
'bedrooms': [bedrooms],
'bathrooms': [bathrooms],
'age': [age],
'location_encoded': [location_encoded]
})
if st.button("Predict House Price"):
# Scale input
input_scaled = results['scaler'].transform(input_data)
# Make prediction
prediction = results['model'].predict(input_scaled)[0]
# Display results
st.success(f"Predicted House Price: ${prediction:,.2f}")
# Price range (mock confidence interval)
lower_bound = prediction * 0.9
upper_bound = prediction * 1.1
st.info(f"Estimated Price Range: ${lower_bound:,.2f} - ${upper_bound:,.2f}")
# 7. Model Comparison
def compare_models(results_list, model_names, task_type):
"""Compare multiple models"""
st.subheader("🆚 Model Comparison")
if task_type == "classification":
# Create comparison table
comparison_data = []
for i, result in enumerate(results_list):
comparison_data.append({
'Model': model_names[i],
'Accuracy': result['accuracy'],
'Parameters': len(result['model'].get_params())
})
comparison_df = pd.DataFrame(comparison_data)
st.dataframe(comparison_df, use_container_width=True)
# Accuracy comparison chart
fig_acc = px.bar(
comparison_df,
x='Model',
y='Accuracy',
title='Model Accuracy Comparison'
)
st.plotly_chart(fig_acc, use_container_width=True)
else:
# Create comparison table for regression
comparison_data = []
for i, result in enumerate(results_list):
comparison_data.append({
'Model': model_names[i],
'RMSE': result['rmse'],
'MAE': result['mae'],
'R²': result['r2']
})
comparison_df = pd.DataFrame(comparison_data)
st.dataframe(comparison_df, use_container_width=True)
# Metrics comparison charts
col1, col2 = st.columns(2)
with col1:
fig_rmse = px.bar(
comparison_df,
x='Model',
y='RMSE',
title='RMSE Comparison (Lower is Better)'
)
st.plotly_chart(fig_rmse, use_container_width=True)
with col2:
fig_r2 = px.bar(
comparison_df,
x='Model',
y='R²',
title='R² Comparison (Higher is Better)'
)
st.plotly_chart(fig_r2, use_container_width=True)
# 8. Main Application
def main():
"""Main ML application"""
# Title and description
st.title("🤖 Machine Learning Studio")
st.markdown("Train, evaluate, and compare machine learning models with ease")
# Sidebar for configuration
st.sidebar.title("Configuration")
# Task selection
task_type = st.sidebar.selectbox(
"Select Task Type",
["Classification", "Regression"],
help="Choose between classification or regression tasks"
)
# Model selection
if task_type == "Classification":
model_options = ["Random Forest", "Logistic Regression", "SVM"]
dataset_type = "Customer Segmentation"
else:
model_options = ["Random Forest", "Linear Regression", "SVR"]
dataset_type = "House Price Prediction"
selected_models = st.sidebar.multiselect(
"Select Models to Train",
model_options,
default=[model_options[0]],
help="Choose one or more models to train and compare"
)
# Data parameters
st.sidebar.subheader("Data Parameters")
n_samples = st.sidebar.slider("Number of Samples", 500, 5000, 1000)
test_size = st.sidebar.slider("Test Set Size (%)", 10, 40, 20) / 100
# Generate data
st.sidebar.markdown("---")
if st.sidebar.button("🔄 Generate New Data"):
if 'data' in st.session_state:
del st.session_state['data']
if 'results' in st.session_state:
del st.session_state['results']
st.rerun()
# Load or generate data
if 'data' not in st.session_state:
with st.spinner("Generating dataset..."):
if task_type == "Classification":
st.session_state['data'] = generate_classification_data(n_samples)
else:
st.session_state['data'] = generate_regression_data(n_samples)
data = st.session_state['data']
# Display dataset info
col1, col2, col3 = st.columns(3)
with col1:
st.metric("Dataset Size", f"{len(data):,} samples")
with col2:
st.metric("Features", f"{data.shape[1] - 1}")
with col3:
if task_type == "Classification":
st.metric("Classes", data['target'].nunique())
else:
st.metric("Price Range", f"${data['price'].min():,.0f} - ${data['price'].max():,.0f}")
# Show data preview
with st.expander("📋 View Dataset"):
st.dataframe(data.head(10), use_container_width=True)
# Data statistics
st.subheader("📈 Data Statistics")
st.dataframe(data.describe(), use_container_width=True)
# Train models
if st.button(f"🚀 Train {len(selected_models)} Model(s)"):
if not selected_models:
st.error("Please select at least one model to train.")
return
progress_bar = st.progress(0)
status_text = st.empty()
results = []
for i, model_name in enumerate(selected_models):
status_text.text(f"Training {model_name}...")
progress_bar.progress((i + 1) / len(selected_models))
# Preprocess data
if task_type == "Classification":
X, y, _ = preprocess_classification_data(data)
result = train_classification_model(X, y, model_name, test_size)
else:
X, y, _ = preprocess_regression_data(data)
result = train_regression_model(X, y, model_name, test_size)
results.append(result)
st.session_state['results'] = results
progress_bar.progress(1.0)
status_text.text("Training completed!")
# Store results for later use
st.session_state['last_results'] = results
st.session_state['last_models'] = selected_models
# Display results
if 'last_results' in st.session_state:
results = st.session_state['last_results']
model_names = st.session_state['last_models']
# Model comparison
if len(results) > 1:
compare_models(results, model_names, task_type)
# Individual model results
for i, (result, model_name) in enumerate(zip(results, model_names)):
st.markdown("---")
st.subheader(f"📋 {model_name} Results")
# Performance metrics
col1, col2, col3 = st.columns(3)
if task_type == "Classification":
with col1:
st.metric("Accuracy", f"{result['accuracy']:.4f}")
with col2:
st.metric("Test Samples", len(result['y_test']))
with col3:
st.metric("Training Samples", len(result['y_train']))
else:
with col1:
st.metric("RMSE", f"{result['rmse']:.2f}")
with col2:
st.metric("R²", f"{result['r2']:.4f}")
with col3:
st.metric("MAE", f"{result['mae']:.2f}")
# Visualizations
if task_type == "Classification":
plot_classification_results(result)
# Classification report
with st.expander("📄 Detailed Classification Report"):
report = classification_report(
result['y_test'],
result['y_pred'],
output_dict=True
)
report_df = pd.DataFrame(report).transpose()
st.dataframe(report_df, use_container_width=True)
else:
plot_regression_results(result)
# Prediction interface for best model
if task_type == "Classification":
best_idx = np.argmax([r['accuracy'] for r in results])
else:
best_idx = np.argmin([r['rmse'] for r in results])
st.markdown("---")
st.subheader(f"🎯 Prediction Interface (Best: {model_names[best_idx]})")
create_prediction_interface(results[best_idx], task_type)
# Footer
st.sidebar.markdown("---")
st.sidebar.markdown("### About")
st.sidebar.info(f"""
**Current Task:** {task_type}
**Dataset:** {dataset_type}
**Models Available:** {len(model_options)}
**Last Updated:** {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
""")
if __name__ == "__main__":
main()