🎯 empfohlene Sammlungen
Balanced sample collections from various categories for you to explore
Scala-Beispiele
Wesentliche Scala-Programmierbeispiele und funktionale Programmierungskonzepte
💻 Scala Hello World scala
🟢 simple
⭐⭐
Grundlegendes Hello World-Programm und fundamentale Syntax-Beispiele
⏱️ 15 min
🏷️ scala, programming, beginner, functional, jvm
Prerequisites:
Basic programming concepts, Understanding of JVM ecosystem
// Scala Hello World Examples
// 1. Basic Hello World
object HelloWorld {
def main(args: Array[String]): Unit = {
println("Hello, World!")
}
}
// 2. Hello World with App trait
object HelloWorldApp extends App {
println("Hello, World from App!")
}
// 3. Hello World with function
def sayHello(): String = {
"Hello, World!"
}
println(sayHello())
// 4. Hello World with function parameters
def greet(name: String): String = {
s"Hello, $name!"
}
println(greet("World"))
println(greet("Scala"))
// 5. Hello World with class
class Greeter(message: String = "Hello, World!") {
def greet(): String = message
}
val greeter = new Greeter()
println(greeter.greet())
val customGreeter = new Greeter("Hello from custom class!")
println(customGreeter.greet())
// 6. Hello World with case class
case class Person(name: String, greeting: String = "Hello") {
def sayHello(): String = s"$greeting, $name!"
}
val person = Person("World")
println(person.sayHello())
val customPerson = Person("Scala", "Greetings")
println(customPerson.sayHello())
// 7. Hello World multiple times
object MultipleHellos extends App {
for (i <- 1 to 5) {
println(s"Hello, World! $i")
}
}
// 8. Hello World with list
object ListHellos extends App {
val greetings = List("Hello", "Bonjour", "Hola", "Ciao", "こんにちは")
greetings.foreach { greeting =>
println(s"$greeting, World!")
}
}
// 9. Hello World with map
object MapHellos extends App {
val greetings = Map(
"en" -> "Hello",
"es" -> "Hola",
"fr" -> "Bonjour",
"de" -> "Hallo",
"ja" -> "こんにちは"
)
greetings.foreach { case (lang, greeting) =>
println(s"$greeting, World! ($lang)")
}
}
// 10. Hello World with pattern matching
object PatternMatchingHello extends App {
def greetByTime(hour: Int): String = {
hour match {
case h if h < 12 => "Good morning"
case h if h < 18 => "Good afternoon"
case _ => "Good evening"
}
}
val currentHour = 14
println(s"${greetByTime(currentHour)}, World!")
}
// Basic data types examples
object DataTypes extends App {
// Numbers
val integer: Int = 42
val long: Long = 1000000L
val float: Float = 3.14f
val double: Double = 2.71828
// String
val text: String = "Scala"
// Boolean
val isAwesome: Boolean = true
// Collections
val numbers: List[Int] = List(1, 2, 3, 4, 5)
val fruits: List[String] = List("apple", "banana", "cherry")
val scores: Map[String, Int] = Map(
"math" -> 95,
"science" -> 88,
"english" -> 92
)
println(s"Integer: $integer, Type: ${integer.getClass.getSimpleName}")
println(s"Double: $double, Type: ${double.getClass.getSimpleName}")
println(s"String: $text, Type: ${text.getClass.getSimpleName}")
println(s"Boolean: $isAwesome, Type: ${isAwesome.getClass.getSimpleName}")
println(s"List: $numbers, Type: ${numbers.getClass.getSimpleName}")
println(s"Map: $scores, Type: ${scores.getClass.getSimpleName}")
}
// Control flow examples
object ControlFlow extends App {
val age = 18
// If-else expression
val message = if (age >= 18) {
"You are an adult"
} else {
"You are a minor"
}
println(message)
// Match expression (Scala's switch)
val grade = 'A'
val gradeMessage = grade match {
case 'A' => "Excellent!"
case 'B' => "Good!"
case 'C' => "Fair"
case _ => "Needs improvement"
}
println(gradeMessage)
// For comprehension
val fruits = List("apple", "banana", "cherry")
for (fruit <- fruits) {
println(s"I like $fruit")
}
// While loop
var count = 0
while (count < 3) {
println(s"Count: $count")
count += 1
}
}
// String interpolation and operations
object StringOperations extends App {
val name = "Scala"
val version = 3
val rating = 4.5
println(s"Language: $name")
println(s"Version: $version")
println(s"Rating: $rating")
println(s"Formatted: Language $name version $version has rating $rating")
// f-interpolator for formatted strings
println(f"Pi: ${math.Pi}%.2f")
println(f"Number: ${42}%05d")
// String methods
val text = "Hello Scala World"
println(s"Original: $text")
println(s"Upper: ${text.toUpperCase}")
println(s"Lower: ${text.toLowerCase}")
println(s"Words: ${text.split(" ").mkString("[", "][", "]")}")
println(s"Length: ${text.length}")
}
// Option type for null safety
object OptionExamples extends App {
def findUser(id: Int): Option[String] = {
if (id > 0) Some(s"User$id") else None
}
val user1 = findUser(1)
val user2 = findUser(-1)
// Pattern matching on Option
user1 match {
case Some(name) => println(s"Found user: $name")
case None => println("User not found")
}
// UsinggetOrElse
println(s"User2 name: ${user2.getOrElse("Unknown")}")
// Using map and flatMap
val userNameLength = user1.map(_.length)
println(s"User1 name length: $userNameLength")
}
// Tuple examples
object TupleExamples extends App {
// Creating tuples
val person: Tuple2[String, Int] = ("John", 25)
val coordinates = (10.5, 20.3)
val triple = ("Alice", 30, "Engineer")
// Accessing tuple elements
println(s"Name: ${person._1}, Age: ${person._2}")
println(s"X: ${coordinates._1}, Y: ${coordinates._2}")
// Pattern matching on tuples
val result = person match {
case (name, age) if age < 18 => s"$name is a minor"
case (name, age) => s"$name is ${age} years old"
}
println(result)
}
// Basic functions as values
object FunctionValues extends App {
// Function as a value
val add: (Int, Int) => Int = (a, b) => a + b
val multiply: (Int, Int) => Int = (a, b) => a * b
println(s"5 + 3 = ${add(5, 3)}")
println(s"5 * 3 = ${multiply(5, 3)}")
// Higher-order function
def applyOperation(a: Int, b: Int, operation: (Int, Int) => Int): Int = {
operation(a, b)
}
println(s"Apply addition: ${applyOperation(10, 5, add)}")
println(s"Apply multiplication: ${applyOperation(10, 5, multiply)}")
// Function composition
val addOne: Int => Int = _ + 1
val multiplyByTwo: Int => Int = _ * 2
val composedFunction = addOne.andThen(multiplyByTwo)
println(s"Composed function (5): ${composedFunction(5)}")
}
💻 Funktionale Programmierung Scala scala
🟡 intermediate
⭐⭐⭐⭐
Fortgeschrittene funktionale Programmierungskonzepte: Unveränderlichkeit, Higher-Order-Funktionen und Typsystem
⏱️ 30 min
🏷️ scala, functional, monads, immutability, type system
Prerequisites:
Scala basics, Understanding of functional programming concepts
// Scala Functional Programming Examples
import scala.util.{Try, Success, Failure}
// 1. Immutability and immutable collections
object ImmutabilityExamples extends App {
// Immutable list operations
val numbers = List(1, 2, 3, 4, 5)
val doubledNumbers = numbers.map(_ * 2)
val evenNumbers = numbers.filter(_ % 2 == 0)
val sum = numbers.foldLeft(0)(_ + _)
println(s"Original: $numbers")
println(s"Doubled: $doubledNumbers")
println(s"Even numbers: $evenNumbers")
println(s"Sum: $sum")
// Immutable Map operations
val scores = Map("Alice" -> 95, "Bob" -> 87, "Charlie" -> 92)
val updatedScores = scores + ("David" -> 88)
val charlieScore = scores.get("Charlie")
println(s"Scores: $scores")
println(s"Updated scores: $updatedScores")
println(s"Charlie's score: $charlieScore")
}
// 2. Higher-order functions
object HigherOrderFunctions extends App {
// Function that takes a function as parameter
def processList[A, B](list: List[A])(operation: A => B): List[B] = {
list.map(operation)
}
def processListWithPredicate[A](
list: List[A]
)(predicate: A => Boolean)(operation: A => A): List[A] = {
list.filter(predicate).map(operation)
}
val numbers = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
// Using processList
val strings = processList(numbers)(_.toString)
println(s"Numbers as strings: $strings")
// Using processListWithPredicate
val doubledEvens = processListWithPredicate(numbers)(_ % 2 == 0)(_ * 2)
println(s"Doubled even numbers: $doubledEvens")
// Function that returns a function (currying)
def multiplier(factor: Int): Int => Int = {
(x: Int) => x * factor
}
val doubler = multiplier(2)
val tripler = multiplier(3)
println(s"Doubler(5): ${doubler(5)}")
println(s"Tripler(5): ${tripler(5)}")
// Curried function
def add(a: Int)(b: Int): Int = a + b
val addFive = add(5) _
println(s"Add 5 to 10: ${addFive(10)}")
// Function composition
val addOne: Int => Int = _ + 1
val multiplyByTwo: Int => Int = _ * 2
val toString: Int => String = _.toString
val composed1 = addOne.andThen(multiplyByTwo)
val composed2 = addOne.compose(multiplyByTwo)
val composed3 = toString.compose(addOne).compose(multiplyByTwo)
println(s"Composed1 (5 -> add -> multiply): ${composed1(5)}")
println(s"Composed2 (5 -> multiply -> add): ${composed2(5)}")
println(s"Composed3 (5 -> multiply -> add -> string): ${composed3(5)}")
}
// 3. Pattern matching advanced
object AdvancedPatternMatching extends App {
// Matching on case classes
sealed trait Shape
case class Circle(radius: Double) extends Shape
case class Rectangle(width: Double, height: Double) extends Shape
case class Triangle(base: Double, height: Double) extends Shape
def calculateArea(shape: Shape): Double = shape match {
case Circle(radius) => math.Pi * radius * radius
case Rectangle(width, height) => width * height
case Triangle(base, height) => (base * height) / 2
}
val shapes = List(
Circle(5.0),
Rectangle(4.0, 6.0),
Triangle(3.0, 4.0)
)
shapes.foreach { shape =>
println(s"Shape: $shape, Area: ${calculateArea(shape)}")
}
// Pattern matching with guards
def describeNumber(num: Int): String = num match {
case n if n < 0 => s"$n is negative"
case 0 => "Zero"
case n if n < 10 => s"$n is a single digit"
case n if n % 2 == 0 => s"$n is even"
case n => s"$n is odd"
}
val numbers = List(-5, 0, 7, 8, 15)
numbers.foreach(n => println(s"$n: ${describeNumber(n)}"))
// Matching on collections
def analyzeList(list: List[Int]): String = list match {
case Nil => "Empty list"
case head :: Nil => s"Single element: $head"
case first :: second :: Nil => s"Two elements: $first, $second"
case first :: rest => s"First: $first, rest: ${rest.size} elements"
}
val lists = List(Nil, List(1), List(1, 2), List(1, 2, 3, 4))
lists.foreach(lst => println(s"$lst -> ${analyzeList(lst)}"))
}
// 4. Either and Try for error handling
object ErrorHandling extends App {
// Either for handling errors that can be recovered
def divideEither(a: Int, b: Int): Either[String, Double] = {
if (b == 0) Left("Division by zero")
else Right(a.toDouble / b)
}
def sqrtEither(x: Double): Either[String, Double] = {
if (x < 0) Left("Cannot calculate square root of negative number")
else Right(math.sqrt(x))
}
// Composing Either operations
def divideAndSqrt(a: Int, b: Int, x: Double): Either[String, Double] = {
for {
result <- divideEither(a, b)
sqrtResult <- sqrtEither(result)
} yield sqrtResult
}
val result1 = divideAndSqrt(16, 2, 0) // Success
val result2 = divideAndSqrt(16, 0, 0) // Division by zero
val result3 = divideAndSqrt(-16, 2, 0) // Square root of negative
println(s"Result1: ${result1.getOrElse("Error")}")
println(s"Result2: ${result2.left.getOrElse("Success")}")
println(s"Result3: ${result3.left.getOrElse("Success")}")
// Try for exception handling
def divideTry(a: Int, b: Int): Try[Double] = {
Try(a.toDouble / b)
}
def readFileTry(filename: String): Try[String] = {
Try {
val source = scala.io.Source.fromFile(filename)
val content = source.mkString
source.close()
content
}
}
// Working with Try
val tryResult = divideTry(10, 2)
val tryFailure = divideTry(10, 0)
tryResult match {
case Success(value) => println(s"Success: $value")
case Failure(exception) => println(s"Failure: ${exception.getMessage}")
}
// Transforming Try
val transformedTry = tryResult.map(_ * 2).filter(_ > 5).getOrElse(0.0)
println(s"Transformed result: $transformedTry")
}
// 5. Lazy evaluation
object LazyEvaluation extends App {
// Lazy values
lazy val expensiveComputation: Int = {
println("Computing expensive value...")
Thread.sleep(1000) // Simulate expensive operation
42
}
println("Before using lazy value")
println(s"Lazy value: $expensiveComputation")
println(s"Lazy value again: $expensiveComputation")
// Infinite streams using lazy evaluation
def naturalNumbers: Stream[Int] = 1 #:: naturalNumbers.map(_ + 1)
val firstTenNumbers = naturalNumbers.take(10).toList
println(s"First 10 natural numbers: $firstTenNumbers")
def fibonacci: Stream[Int] = {
def fib(a: Int, b: Int): Stream[Int] = a #:: fib(b, a + b)
fib(0, 1)
}
val firstTenFibonacci = fibonacci.take(10).toList
println(s"First 10 Fibonacci numbers: $firstTenFibonacci")
// Lazy lists (LazyList in Scala 2.13+)
lazy val lazyFactorials: LazyList[BigInt] = {
def fact(n: Int): LazyList[BigInt] =
if (n == 0) LazyList(1)
else fact(n - 1).map(_ * BigInt(n))
fact(0).zipWithIndex.map(_._1)
}
val firstFiveFactorials = lazyFactorials.take(5).toList
println(s"First 5 factorials: $firstFiveFactorials")
}
// 6. Custom types and type classes
object CustomTypes extends App {
// Custom algebraic data type
sealed trait Weather
case object Sunny extends Weather
case object Cloudy extends Weather
case object Rainy extends Weather
case class Temperature(degrees: Double, scale: String)
case class DayForecast(
day: String,
weather: Weather,
temperature: Option[Temperature]
)
// Weather operations
def getWeatherDescription(forecast: DayForecast): String = {
val weatherDesc = forecast.weather match {
case Sunny => "sunny"
case Cloudy => "cloudy"
case Rainy => "rainy"
}
val tempDesc = forecast.temperature match {
case Some(Temperature(deg, "C")) => s"${deg}°C"
case Some(Temperature(deg, "F")) => s"${deg}°F"
case Some(Temperature(deg, _)) => s"${deg}°"
case None => "temperature unknown"
}
s"${forecast.day} will be $weatherDesc with $tempDesc"
}
val forecasts = List(
DayForecast("Monday", Sunny, Some(Temperature(25.0, "C"))),
DayForecast("Tuesday", Cloudy, Some(Temperature(18.0, "C"))),
DayForecast("Wednesday", Rainy, None)
)
forecasts.foreach(forecast => println(getWeatherDescription(forecast)))
// Type class pattern
trait Show[A] {
def show(value: A): String
}
object Show {
def apply[A](implicit show: Show[A]): Show[A] = show
implicit val intShow: Show[Int] = new Show[Int] {
def show(value: Int): String = s"Int: $value"
}
implicit val stringShow: Show[String] = new Show[String] {
def show(value: String): String = s"String: '$value'"
}
implicit def optionShow[A](implicit showA: Show[A]): Show[Option[A]] =
new Show[Option[A]] {
def show(value: Option[A]): String = value match {
case Some(a) => s"Some(${showA.show(a)})"
case None => "None"
}
}
}
def prettyPrint[A](value: A)(implicit show: Show[A]): Unit = {
println(show.show(value))
}
prettyPrint(42)
prettyPrint("Hello")
prettyPrint(Some(123))
prettyPrint(Option.empty[String])
}
// 7. Fold and scan operations
object FoldOperations extends App {
val numbers = List(1, 2, 3, 4, 5)
// Fold operations
val sum = numbers.foldLeft(0)(_ + _)
val product = numbers.foldRight(1)(_ * _)
val reversed = numbers.foldLeft(List.empty[Int])((acc, x) => x :: acc)
println(s"Sum: $sum")
println(s"Product: $product")
println(s"Reversed: $reversed")
// Scan operations
val runningSum = numbers.scanLeft(0)(_ + _)
val runningProduct = numbers.scanRight(1)(_ * _)
println(s"Running sum: $runningSum")
println(s"Running product: $runningProduct")
// Complex fold example - word count
val text = "hello world hello scala world programming"
val words = text.split("\s+").toList
val wordCount = words.foldLeft(Map.empty[String, Int]) { (acc, word) =>
acc + (word -> (acc.getOrElse(word, 0) + 1))
}
println(s"Word count: $wordCount")
// Using fold to group by length
val wordsByLength = words.groupBy(_.length)
println(s"Words by length: $wordsByLength")
}
// 8. Monads and monadic operations
object MonadExamples extends App {
// List monad
val lists = List(1, 2, 3).map(i => List(i, i * 2)).flatten
val listsComprehension = for {
i <- List(1, 2, 3)
j <- List(i, i * 2)
} yield j
println(s"Flattened lists: $lists")
println(s"For comprehension: $listsComprehension")
// Option monad
val users = Map(
1 -> "Alice",
2 -> "Bob",
3 -> "Charlie"
)
val roles = Map(
"Alice" -> "Admin",
"Bob" -> "User",
"Charlie" -> "Moderator"
)
def getUserRole(userId: Int): Option[String] = {
for {
name <- users.get(userId)
role <- roles.get(name)
} yield role
}
println(s"Role for user 1: ${getUserRole(1)}")
println(s"Role for user 4: ${getUserRole(4)}")
// Custom monadic type
case class Maybe[A](value: Option[A]) {
def flatMap[B](f: A => Maybe[B]): Maybe[B] = {
Maybe(value.flatMap(a => f(a).value))
}
def map[B](f: A => B): Maybe[B] = {
Maybe(value.map(f))
}
def filter(p: A => Boolean): Maybe[A] = {
Maybe(value.filter(p))
}
def getOrElse(default: A): A = value.getOrElse(default)
}
object Maybe {
def apply[A](value: A): Maybe[A] = Maybe(Some(value))
def empty[A]: Maybe[A] = Maybe(None)
}
def safeDivide(a: Int, b: Int): Maybe[Int] = {
if (b == 0) Maybe.empty[Int]
else Maybe(a / b)
}
val result = for {
x <- safeDivide(10, 2)
y <- safeDivide(x, 5)
} yield y
println(s"Safe division result: ${result.getOrElse(-1)}")
}
💻 Big Data Scala mit Apache Spark scala
🔴 complex
⭐⭐⭐⭐⭐
Big-Data-Verarbeitungsbeispiele mit Apache Spark und Scala
⏱️ 45 min
🏷️ scala, spark, big data, analytics, distributed computing
Prerequisites:
Scala basics, Understanding of distributed computing, Basic data analytics concepts
// Scala for Big Data with Apache Spark Examples
// Note: These examples require Apache Spark dependencies
// Add to build.sbt:
// libraryDependencies += "org.apache.spark" %% "spark-core" % "3.4.0"
// libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.4.0"
import org.apache.spark.sql.{SparkSession, DataFrame, Row}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.rdd.RDD
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
object SparkSetup {
// Initialize Spark Session
def createSparkSession(appName: String): SparkSession = {
SparkSession.builder()
.appName(appName)
.master("local[*]") // Use all available cores locally
.config("spark.sql.adaptive.enabled", "true")
.config("spark.sql.adaptive.coalescePartitions.enabled", "true")
.getOrCreate()
}
}
// 1. Basic DataFrame operations
object DataFrameBasics {
import SparkSetup._
def basicDataFrameOperations(spark: SparkSession): Unit = {
import spark.implicits._
println("=== Basic DataFrame Operations ===")
// Create a DataFrame from a sequence
val data = Seq(
("Alice", 34, "Engineering"),
("Bob", 45, "Marketing"),
("Charlie", 29, "Engineering"),
("Diana", 38, "Sales"),
("Eve", 31, "Marketing")
)
val df = data.toDF("name", "age", "department")
println("Original DataFrame:")
df.show()
// Basic transformations
println("With uppercase names:")
df.withColumn("name_upper", upper(col("name"))).show()
println("Filtered by age > 30:")
df.filter(col("age") > 30).show()
println("Grouped by department:")
df.groupBy("department")
.agg(
count("*").as("count"),
avg("age").as("avg_age"),
max("age").as("max_age"),
min("age").as("min_age")
)
.show()
// Sort operations
println("Sorted by age descending:")
df.orderBy(desc("age")).show()
// SQL operations
df.createOrReplaceTempView("employees")
println("SQL query results:")
spark.sql("SELECT department, COUNT(*) as count FROM employees GROUP BY department").show()
}
}
// 2. RDD operations
object RDDBasics {
import SparkSetup._
def basicRDOperations(spark: SparkSession): Unit = {
println("=== Basic RDD Operations ===")
// Create RDD from collection
val numbers = spark.sparkContext.parallelize(1 to 100)
// Basic transformations
val squares = numbers.map(x => x * x)
val evenSquares = squares.filter(_ % 2 == 0)
val sumOfEvenSquares = evenSquares.reduce(_ + _)
println(s"Sum of even squares of 1-100: $sumOfEvenSquares")
// Word count example
val text = spark.sparkContext.parallelize(Seq(
"hello world hello spark",
"spark is fast",
"hello scala world",
"big data processing"
))
val wordCounts = text
.flatMap(line => line.split("\s+"))
.map(word => (word.toLowerCase, 1))
.reduceByKey(_ + _)
.sortBy(_._2, ascending = false)
println("Word counts:")
wordCounts.collect().foreach { case (word, count) =>
println(s"$word: $count")
}
// Group by key example
val data = spark.sparkContext.parallelize(Seq(
("department1", "Alice"),
("department1", "Bob"),
("department2", "Charlie"),
("department1", "Diana"),
("department2", "Eve")
))
val groupedData = data.groupByKey()
println("Grouped by department:")
groupedData.collect().foreach { case (dept, employees) =>
println(s"$dept: ${employees.mkString(", ")}")
}
// Custom partitioning
val partitionedData = numbers.repartition(4)
println(s"Number of partitions: ${partitionedData.getNumPartitions}")
}
}
// 3. Advanced DataFrame operations with complex data types
object AdvancedDataFrames {
import SparkSetup._
def complexDataOperations(spark: SparkSession): Unit = {
import spark.implicits._
println("=== Complex DataFrame Operations ===")
// Create schema
val schema = StructType(Array(
StructField("id", IntegerType, nullable = false),
StructField("name", StringType, nullable = false),
StructField("tags", ArrayType(StringType), nullable = true),
StructField("properties", MapType(StringType, StringType), nullable = true),
StructField("created_at", TimestampType, nullable = false)
))
// Create data with complex types
val data = Seq(
Row(1, "Product1", Array("electronics", "premium"),
Map("brand" -> "BrandA", "category" -> "phones"),
java.sql.Timestamp.valueOf("2023-01-15 10:30:00")),
Row(2, "Product2", Array("books", "fiction"),
Map("author" -> "AuthorX", "genre" -> "mystery"),
java.sql.Timestamp.valueOf("2023-02-20 14:15:00")),
Row(3, "Product3", Array("electronics", "budget"),
Map("brand" -> "BrandB", "category" -> "tablets"),
java.sql.Timestamp.valueOf("2023-03-10 09:45:00"))
)
val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
println("Complex DataFrame:")
df.show(truncate = false)
// Working with arrays
println("Exploded tags:")
df.select("id", "name", explode(col("tags")).as("tag")).show()
// Working with maps
println("Properties as separate columns:")
df.select(
"id", "name",
col("properties")("brand").as("brand"),
col("properties")("category").as("category")
).show()
// Array operations
println("Products with 'electronics' tag:")
df.filter(array_contains(col("tags"), "electronics")).show()
// JSON operations
df.select(
"id", "name",
to_json(col("properties")).as("properties_json")
).show(false)
// Window functions
import org.apache.spark.sql.expressions.Window
val windowSpec = Window.partitionBy("id").orderBy(desc("created_at"))
// Add row numbers (useful for ranking)
val withRowNum = df.withColumn("row_num", row_number().over(windowSpec))
println("With row numbers:")
withRowNum.show()
// Date/time operations
df.select(
"id", "name",
date_format(col("created_at"), "yyyy-MM-dd").as("date"),
date_format(col("created_at"), "HH:mm:ss").as("time"),
datediff(current_date(), col("created_at")).as("days_ago")
).show()
}
}
// 4. Data aggregation and analytics
object DataAggregation {
import SparkSetup._
def salesAnalytics(spark: SparkSession): Unit = {
import spark.implicits._
println("=== Sales Analytics ===")
// Sample sales data
val salesData = Seq(
("2023-01-01", "ProductA", "Electronics", 1200.0, 10, "North"),
("2023-01-01", "ProductB", "Books", 25.0, 50, "South"),
("2023-01-02", "ProductA", "Electronics", 1150.0, 8, "East"),
("2023-01-02", "ProductC", "Clothing", 75.0, 20, "West"),
("2023-01-03", "ProductB", "Books", 30.0, 15, "North"),
("2023-01-03", "ProductC", "Clothing", 80.0, 12, "South"),
("2023-01-04", "ProductA", "Electronics", 1250.0, 15, "West"),
("2023-01-04", "ProductD", "Electronics", 800.0, 25, "East"),
("2023-01-05", "ProductB", "Books", 28.0, 30, "North")
)
val salesDF = salesData.toDF(
"date", "product", "category", "price", "quantity", "region"
)
// Add calculated columns
val enrichedSalesDF = salesDF
.withColumn("revenue", col("price") * col("quantity"))
.withColumn("year", year(to_date(col("date"))))
.withColumn("month", month(to_date(col("date"))))
.withColumn("day_of_week", date_format(to_date(col("date")), "E"))
println("Enriched sales data:")
enrichedSalesDF.show()
// Daily sales summary
println("Daily sales summary:")
enrichedSalesDF
.groupBy("date")
.agg(
sum("revenue").as("total_revenue"),
sum("quantity").as("total_quantity"),
count("*").as("number_of_transactions"),
avg("price").as("average_price")
)
.orderBy("date")
.show()
// Product performance
println("Product performance:")
enrichedSalesDF
.groupBy("product", "category")
.agg(
sum("revenue").as("total_revenue"),
sum("quantity").as("total_quantity"),
avg("price").as("average_price"),
countDistinct("date").as("days_sold")
)
.orderBy(desc("total_revenue"))
.show()
// Regional analysis
println("Regional analysis:")
enrichedSalesDF
.groupBy("region")
.agg(
sum("revenue").as("total_revenue"),
sum("quantity").as("total_quantity"),
countDistinct("product").as("unique_products"),
avg("revenue").as("avg_transaction_value")
)
.orderBy(desc("total_revenue"))
.show()
// Pivot table: sales by region and category
println("Sales pivot by region and category:")
enrichedSalesDF
.groupBy("region")
.pivot("category")
.agg(sum("revenue"))
.na.fill(0)
.show()
// Rolling window calculations
import org.apache.spark.sql.expressions.Window
val windowSpec = Window
.partitionBy("product")
.orderBy("date")
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
println("Running totals per product:")
enrichedSalesDF
.withColumn("running_revenue", sum("revenue").over(windowSpec))
.withColumn("running_quantity", sum("quantity").over(windowSpec))
.select("product", "date", "revenue", "running_revenue", "quantity", "running_quantity")
.orderBy("product", "date")
.show()
}
}
// 5. Machine Learning with Spark ML
object SparkML {
import SparkSetup._
def simpleClassification(spark: SparkSession): Unit = {
import spark.implicits._
println("=== Simple Machine Learning Example ===")
// Create sample training data
val trainingData = Seq(
(1.0, "A", 10.0, 20.0),
(0.0, "B", 5.0, 15.0),
(1.0, "A", 12.0, 22.0),
(0.0, "B", 6.0, 14.0),
(1.0, "A", 11.0, 21.0),
(0.0, "B", 4.0, 13.0),
(1.0, "A", 13.0, 25.0),
(0.0, "B", 7.0, 16.0),
(1.0, "A", 9.0, 19.0),
(0.0, "B", 3.0, 12.0)
)
val df = trainingData.toDF("label", "category", "feature1", "feature2")
println("Training data:")
df.show()
// Convert string column to numeric
val indexer = new StringIndexer()
.setInputCol("category")
.setOutputCol("categoryIndex")
.fit(df)
val indexedDF = indexer.transform(df)
// Assemble features
val assembler = new VectorAssembler()
.setInputCols(Array("categoryIndex", "feature1", "feature2"))
.setOutputCol("features")
val assembledDF = assembler.transform(indexedDF)
println("Data with assembled features:")
assembledDF.select("label", "features").show(false)
// Split data
val Array(trainingDataSplit, testDataSplit) = assembledDF.randomSplit(Array(0.8, 0.2), seed = 12345)
// Train logistic regression model
val lr = new LogisticRegression()
.setLabelCol("label")
.setFeaturesCol("features")
.setMaxIter(10)
.setRegParam(0.01)
val lrModel = lr.fit(trainingDataSplit)
println("Model coefficients:")
println(s"Coefficients: ${lrModel.coefficients} Intercept: ${lrModel.intercept}")
// Make predictions
val predictions = lrModel.transform(testDataSplit)
println("Predictions:")
predictions.select("label", "features", "probability", "prediction").show(false)
// Evaluate model
val evaluator = new BinaryClassificationEvaluator()
.setLabelCol("label")
.setRawPredictionCol("rawPrediction")
.setMetricName("areaUnderROC")
val auc = evaluator.evaluate(predictions)
println(s"Area Under ROC: $auc")
}
}
// 6. Performance optimization and best practices
object Optimization {
import SparkSetup._
def optimizationTechniques(spark: SparkSession): Unit = {
import spark.implicits._
println("=== Spark Optimization Techniques ===")
// Create large dataset for optimization examples
val largeData = (1 to 100000).map { i =>
(i, s"item_$i", i % 100, i.toDouble / 100.0, i % 5 == 0)
}
val largeDF = largeData.toDF("id", "name", "category", "value", "is_premium")
// Cache commonly used DataFrame
println("Caching example:")
val startCache = System.currentTimeMillis()
largeDF.cache() // Cache in memory
// First action - triggers caching
val count1 = largeDF.count()
println(s"First count: $count1, Time: ${System.currentTimeMillis() - startCache}ms")
// Second action - uses cached data
val startCached = System.currentTimeMillis()
val count2 = largeDF.filter(col("category") === 50).count()
println(s"Second count: $count2, Time: ${System.currentTimeMillis() - startCached}ms")
// Partitioning example
println("\nPartitioning example:")
val partitionedDF = largeDF.repartition(col("category"))
println(s"Original partitions: ${largeDF.rdd.getNumPartitions}")
println(s"Repartitioned partitions: ${partitionedDF.rdd.getNumPartitions}")
// Coalesce for reducing partitions
val coalescedDF = largeDF.coalesce(4)
println(s"Coalesced partitions: ${coalescedDF.rdd.getNumPartitions}")
// Broadcast join example
println("\nBroadcast join example:")
val smallLookup = Seq((0, "Basic"), (1, "Premium"), (2, "VIP"))
.toDF("is_premium", "tier")
// Use broadcast for small lookup table
val joinedDF = largeDF.join(broadcast(smallLookup), "is_premium")
println("Broadcast join result:")
joinedDF.select("id", "name", "tier").show(10)
// Explain plans
println("\nQuery plan explanation:")
joinedDF.explain(true)
// Clean up
largeDF.unpersist()
}
}
// Main application to run all examples
object SparkExamples {
def main(args: Array[String]): Unit = {
val spark = SparkSetup.createSparkSession("Scala Spark Examples")
try {
// Run examples
DataFrameBasics.basicDataFrameOperations(spark)
println()
RDDBasics.basicRDOperations(spark)
println()
AdvancedDataFrames.complexDataOperations(spark)
println()
DataAggregation.salesAnalytics(spark)
println()
SparkML.simpleClassification(spark)
println()
Optimization.optimizationTechniques(spark)
} catch {
case e: Exception =>
println(s"Error: ${e.getMessage}")
e.printStackTrace()
} finally {
spark.stop()
}
}
}