Scala 语言示例

Scala编程示例和函数式编程概念

💻 Scala Hello World scala

🟢 simple ⭐⭐

Scala Hello World程序和基本语法示例

⏱️ 15 min 🏷️ scala, programming, beginner, functional, jvm
Prerequisites: Basic programming concepts, Understanding of JVM ecosystem
// Scala Hello World Examples

// 1. Basic Hello World
object HelloWorld {
  def main(args: Array[String]): Unit = {
    println("Hello, World!")
  }
}

// 2. Hello World with App trait
object HelloWorldApp extends App {
  println("Hello, World from App!")
}

// 3. Hello World with function
def sayHello(): String = {
  "Hello, World!"
}

println(sayHello())

// 4. Hello World with function parameters
def greet(name: String): String = {
  s"Hello, $name!"
}

println(greet("World"))
println(greet("Scala"))

// 5. Hello World with class
class Greeter(message: String = "Hello, World!") {
  def greet(): String = message
}

val greeter = new Greeter()
println(greeter.greet())

val customGreeter = new Greeter("Hello from custom class!")
println(customGreeter.greet())

// 6. Hello World with case class
case class Person(name: String, greeting: String = "Hello") {
  def sayHello(): String = s"$greeting, $name!"
}

val person = Person("World")
println(person.sayHello())

val customPerson = Person("Scala", "Greetings")
println(customPerson.sayHello())

// 7. Hello World multiple times
object MultipleHellos extends App {
  for (i <- 1 to 5) {
    println(s"Hello, World! $i")
  }
}

// 8. Hello World with list
object ListHellos extends App {
  val greetings = List("Hello", "Bonjour", "Hola", "Ciao", "こんにちは")

  greetings.foreach { greeting =>
    println(s"$greeting, World!")
  }
}

// 9. Hello World with map
object MapHellos extends App {
  val greetings = Map(
    "en" -> "Hello",
    "es" -> "Hola",
    "fr" -> "Bonjour",
    "de" -> "Hallo",
    "ja" -> "こんにちは"
  )

  greetings.foreach { case (lang, greeting) =>
    println(s"$greeting, World! ($lang)")
  }
}

// 10. Hello World with pattern matching
object PatternMatchingHello extends App {
  def greetByTime(hour: Int): String = {
    hour match {
      case h if h < 12 => "Good morning"
      case h if h < 18 => "Good afternoon"
      case _ => "Good evening"
    }
  }

  val currentHour = 14
  println(s"${greetByTime(currentHour)}, World!")
}

// Basic data types examples
object DataTypes extends App {
  // Numbers
  val integer: Int = 42
  val long: Long = 1000000L
  val float: Float = 3.14f
  val double: Double = 2.71828

  // String
  val text: String = "Scala"

  // Boolean
  val isAwesome: Boolean = true

  // Collections
  val numbers: List[Int] = List(1, 2, 3, 4, 5)
  val fruits: List[String] = List("apple", "banana", "cherry")
  val scores: Map[String, Int] = Map(
    "math" -> 95,
    "science" -> 88,
    "english" -> 92
  )

  println(s"Integer: $integer, Type: ${integer.getClass.getSimpleName}")
  println(s"Double: $double, Type: ${double.getClass.getSimpleName}")
  println(s"String: $text, Type: ${text.getClass.getSimpleName}")
  println(s"Boolean: $isAwesome, Type: ${isAwesome.getClass.getSimpleName}")
  println(s"List: $numbers, Type: ${numbers.getClass.getSimpleName}")
  println(s"Map: $scores, Type: ${scores.getClass.getSimpleName}")
}

// Control flow examples
object ControlFlow extends App {
  val age = 18

  // If-else expression
  val message = if (age >= 18) {
    "You are an adult"
  } else {
    "You are a minor"
  }
  println(message)

  // Match expression (Scala's switch)
  val grade = 'A'
  val gradeMessage = grade match {
    case 'A' => "Excellent!"
    case 'B' => "Good!"
    case 'C' => "Fair"
    case _   => "Needs improvement"
  }
  println(gradeMessage)

  // For comprehension
  val fruits = List("apple", "banana", "cherry")
  for (fruit <- fruits) {
    println(s"I like $fruit")
  }

  // While loop
  var count = 0
  while (count < 3) {
    println(s"Count: $count")
    count += 1
  }
}

// String interpolation and operations
object StringOperations extends App {
  val name = "Scala"
  val version = 3
  val rating = 4.5

  println(s"Language: $name")
  println(s"Version: $version")
  println(s"Rating: $rating")
  println(s"Formatted: Language $name version $version has rating $rating")

  // f-interpolator for formatted strings
  println(f"Pi: ${math.Pi}%.2f")
  println(f"Number: ${42}%05d")

  // String methods
  val text = "Hello Scala World"
  println(s"Original: $text")
  println(s"Upper: ${text.toUpperCase}")
  println(s"Lower: ${text.toLowerCase}")
  println(s"Words: ${text.split(" ").mkString("[", "][", "]")}")
  println(s"Length: ${text.length}")
}

// Option type for null safety
object OptionExamples extends App {
  def findUser(id: Int): Option[String] = {
    if (id > 0) Some(s"User$id") else None
  }

  val user1 = findUser(1)
  val user2 = findUser(-1)

  // Pattern matching on Option
  user1 match {
    case Some(name) => println(s"Found user: $name")
    case None       => println("User not found")
  }

  // UsinggetOrElse
  println(s"User2 name: ${user2.getOrElse("Unknown")}")

  // Using map and flatMap
  val userNameLength = user1.map(_.length)
  println(s"User1 name length: $userNameLength")
}

// Tuple examples
object TupleExamples extends App {
  // Creating tuples
  val person: Tuple2[String, Int] = ("John", 25)
  val coordinates = (10.5, 20.3)
  val triple = ("Alice", 30, "Engineer")

  // Accessing tuple elements
  println(s"Name: ${person._1}, Age: ${person._2}")
  println(s"X: ${coordinates._1}, Y: ${coordinates._2}")

  // Pattern matching on tuples
  val result = person match {
    case (name, age) if age < 18 => s"$name is a minor"
    case (name, age)             => s"$name is ${age} years old"
  }
  println(result)
}

// Basic functions as values
object FunctionValues extends App {
  // Function as a value
  val add: (Int, Int) => Int = (a, b) => a + b
  val multiply: (Int, Int) => Int = (a, b) => a * b

  println(s"5 + 3 = ${add(5, 3)}")
  println(s"5 * 3 = ${multiply(5, 3)}")

  // Higher-order function
  def applyOperation(a: Int, b: Int, operation: (Int, Int) => Int): Int = {
    operation(a, b)
  }

  println(s"Apply addition: ${applyOperation(10, 5, add)}")
  println(s"Apply multiplication: ${applyOperation(10, 5, multiply)}")

  // Function composition
  val addOne: Int => Int = _ + 1
  val multiplyByTwo: Int => Int = _ * 2
  val composedFunction = addOne.andThen(multiplyByTwo)

  println(s"Composed function (5): ${composedFunction(5)}")
}

💻 Scala 函数式编程 scala

🟡 intermediate ⭐⭐⭐⭐

高级函数式编程概念:不可变性、高阶函数和类型系统

⏱️ 30 min 🏷️ scala, functional, monads, immutability, type system
Prerequisites: Scala basics, Understanding of functional programming concepts
// Scala Functional Programming Examples

import scala.util.{Try, Success, Failure}

// 1. Immutability and immutable collections
object ImmutabilityExamples extends App {
  // Immutable list operations
  val numbers = List(1, 2, 3, 4, 5)
  val doubledNumbers = numbers.map(_ * 2)
  val evenNumbers = numbers.filter(_ % 2 == 0)
  val sum = numbers.foldLeft(0)(_ + _)

  println(s"Original: $numbers")
  println(s"Doubled: $doubledNumbers")
  println(s"Even numbers: $evenNumbers")
  println(s"Sum: $sum")

  // Immutable Map operations
  val scores = Map("Alice" -> 95, "Bob" -> 87, "Charlie" -> 92)
  val updatedScores = scores + ("David" -> 88)
  val charlieScore = scores.get("Charlie")

  println(s"Scores: $scores")
  println(s"Updated scores: $updatedScores")
  println(s"Charlie's score: $charlieScore")
}

// 2. Higher-order functions
object HigherOrderFunctions extends App {
  // Function that takes a function as parameter
  def processList[A, B](list: List[A])(operation: A => B): List[B] = {
    list.map(operation)
  }

  def processListWithPredicate[A](
    list: List[A]
  )(predicate: A => Boolean)(operation: A => A): List[A] = {
    list.filter(predicate).map(operation)
  }

  val numbers = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

  // Using processList
  val strings = processList(numbers)(_.toString)
  println(s"Numbers as strings: $strings")

  // Using processListWithPredicate
  val doubledEvens = processListWithPredicate(numbers)(_ % 2 == 0)(_ * 2)
  println(s"Doubled even numbers: $doubledEvens")

  // Function that returns a function (currying)
  def multiplier(factor: Int): Int => Int = {
    (x: Int) => x * factor
  }

  val doubler = multiplier(2)
  val tripler = multiplier(3)

  println(s"Doubler(5): ${doubler(5)}")
  println(s"Tripler(5): ${tripler(5)}")

  // Curried function
  def add(a: Int)(b: Int): Int = a + b

  val addFive = add(5) _
  println(s"Add 5 to 10: ${addFive(10)}")

  // Function composition
  val addOne: Int => Int = _ + 1
  val multiplyByTwo: Int => Int = _ * 2
  val toString: Int => String = _.toString

  val composed1 = addOne.andThen(multiplyByTwo)
  val composed2 = addOne.compose(multiplyByTwo)
  val composed3 = toString.compose(addOne).compose(multiplyByTwo)

  println(s"Composed1 (5 -> add -> multiply): ${composed1(5)}")
  println(s"Composed2 (5 -> multiply -> add): ${composed2(5)}")
  println(s"Composed3 (5 -> multiply -> add -> string): ${composed3(5)}")
}

// 3. Pattern matching advanced
object AdvancedPatternMatching extends App {
  // Matching on case classes
  sealed trait Shape
  case class Circle(radius: Double) extends Shape
  case class Rectangle(width: Double, height: Double) extends Shape
  case class Triangle(base: Double, height: Double) extends Shape

  def calculateArea(shape: Shape): Double = shape match {
    case Circle(radius) => math.Pi * radius * radius
    case Rectangle(width, height) => width * height
    case Triangle(base, height) => (base * height) / 2
  }

  val shapes = List(
    Circle(5.0),
    Rectangle(4.0, 6.0),
    Triangle(3.0, 4.0)
  )

  shapes.foreach { shape =>
    println(s"Shape: $shape, Area: ${calculateArea(shape)}")
  }

  // Pattern matching with guards
  def describeNumber(num: Int): String = num match {
    case n if n < 0      => s"$n is negative"
    case 0               => "Zero"
    case n if n < 10     => s"$n is a single digit"
    case n if n % 2 == 0 => s"$n is even"
    case n               => s"$n is odd"
  }

  val numbers = List(-5, 0, 7, 8, 15)
  numbers.foreach(n => println(s"$n: ${describeNumber(n)}"))

  // Matching on collections
  def analyzeList(list: List[Int]): String = list match {
    case Nil                    => "Empty list"
    case head :: Nil            => s"Single element: $head"
    case first :: second :: Nil => s"Two elements: $first, $second"
    case first :: rest          => s"First: $first, rest: ${rest.size} elements"
  }

  val lists = List(Nil, List(1), List(1, 2), List(1, 2, 3, 4))
  lists.foreach(lst => println(s"$lst -> ${analyzeList(lst)}"))
}

// 4. Either and Try for error handling
object ErrorHandling extends App {
  // Either for handling errors that can be recovered
  def divideEither(a: Int, b: Int): Either[String, Double] = {
    if (b == 0) Left("Division by zero")
    else Right(a.toDouble / b)
  }

  def sqrtEither(x: Double): Either[String, Double] = {
    if (x < 0) Left("Cannot calculate square root of negative number")
    else Right(math.sqrt(x))
  }

  // Composing Either operations
  def divideAndSqrt(a: Int, b: Int, x: Double): Either[String, Double] = {
    for {
      result <- divideEither(a, b)
      sqrtResult <- sqrtEither(result)
    } yield sqrtResult
  }

  val result1 = divideAndSqrt(16, 2, 0)  // Success
  val result2 = divideAndSqrt(16, 0, 0)  // Division by zero
  val result3 = divideAndSqrt(-16, 2, 0) // Square root of negative

  println(s"Result1: ${result1.getOrElse("Error")}")
  println(s"Result2: ${result2.left.getOrElse("Success")}")
  println(s"Result3: ${result3.left.getOrElse("Success")}")

  // Try for exception handling
  def divideTry(a: Int, b: Int): Try[Double] = {
    Try(a.toDouble / b)
  }

  def readFileTry(filename: String): Try[String] = {
    Try {
      val source = scala.io.Source.fromFile(filename)
      val content = source.mkString
      source.close()
      content
    }
  }

  // Working with Try
  val tryResult = divideTry(10, 2)
  val tryFailure = divideTry(10, 0)

  tryResult match {
    case Success(value) => println(s"Success: $value")
    case Failure(exception) => println(s"Failure: ${exception.getMessage}")
  }

  // Transforming Try
  val transformedTry = tryResult.map(_ * 2).filter(_ > 5).getOrElse(0.0)
  println(s"Transformed result: $transformedTry")
}

// 5. Lazy evaluation
object LazyEvaluation extends App {
  // Lazy values
  lazy val expensiveComputation: Int = {
    println("Computing expensive value...")
    Thread.sleep(1000)  // Simulate expensive operation
    42
  }

  println("Before using lazy value")
  println(s"Lazy value: $expensiveComputation")
  println(s"Lazy value again: $expensiveComputation")

  // Infinite streams using lazy evaluation
  def naturalNumbers: Stream[Int] = 1 #:: naturalNumbers.map(_ + 1)

  val firstTenNumbers = naturalNumbers.take(10).toList
  println(s"First 10 natural numbers: $firstTenNumbers")

  def fibonacci: Stream[Int] = {
    def fib(a: Int, b: Int): Stream[Int] = a #:: fib(b, a + b)
    fib(0, 1)
  }

  val firstTenFibonacci = fibonacci.take(10).toList
  println(s"First 10 Fibonacci numbers: $firstTenFibonacci")

  // Lazy lists (LazyList in Scala 2.13+)
  lazy val lazyFactorials: LazyList[BigInt] = {
    def fact(n: Int): LazyList[BigInt] =
      if (n == 0) LazyList(1)
      else fact(n - 1).map(_ * BigInt(n))
    fact(0).zipWithIndex.map(_._1)
  }

  val firstFiveFactorials = lazyFactorials.take(5).toList
  println(s"First 5 factorials: $firstFiveFactorials")
}

// 6. Custom types and type classes
object CustomTypes extends App {
  // Custom algebraic data type
  sealed trait Weather
  case object Sunny extends Weather
  case object Cloudy extends Weather
  case object Rainy extends Weather
  case class Temperature(degrees: Double, scale: String)

  case class DayForecast(
    day: String,
    weather: Weather,
    temperature: Option[Temperature]
  )

  // Weather operations
  def getWeatherDescription(forecast: DayForecast): String = {
    val weatherDesc = forecast.weather match {
      case Sunny => "sunny"
      case Cloudy => "cloudy"
      case Rainy => "rainy"
    }

    val tempDesc = forecast.temperature match {
      case Some(Temperature(deg, "C")) => s"${deg}°C"
      case Some(Temperature(deg, "F")) => s"${deg}°F"
      case Some(Temperature(deg, _))   => s"${deg}°"
      case None                        => "temperature unknown"
    }

    s"${forecast.day} will be $weatherDesc with $tempDesc"
  }

  val forecasts = List(
    DayForecast("Monday", Sunny, Some(Temperature(25.0, "C"))),
    DayForecast("Tuesday", Cloudy, Some(Temperature(18.0, "C"))),
    DayForecast("Wednesday", Rainy, None)
  )

  forecasts.foreach(forecast => println(getWeatherDescription(forecast)))

  // Type class pattern
  trait Show[A] {
    def show(value: A): String
  }

  object Show {
    def apply[A](implicit show: Show[A]): Show[A] = show

    implicit val intShow: Show[Int] = new Show[Int] {
      def show(value: Int): String = s"Int: $value"
    }

    implicit val stringShow: Show[String] = new Show[String] {
      def show(value: String): String = s"String: '$value'"
    }

    implicit def optionShow[A](implicit showA: Show[A]): Show[Option[A]] =
      new Show[Option[A]] {
        def show(value: Option[A]): String = value match {
          case Some(a) => s"Some(${showA.show(a)})"
          case None    => "None"
        }
      }
  }

  def prettyPrint[A](value: A)(implicit show: Show[A]): Unit = {
    println(show.show(value))
  }

  prettyPrint(42)
  prettyPrint("Hello")
  prettyPrint(Some(123))
  prettyPrint(Option.empty[String])
}

// 7. Fold and scan operations
object FoldOperations extends App {
  val numbers = List(1, 2, 3, 4, 5)

  // Fold operations
  val sum = numbers.foldLeft(0)(_ + _)
  val product = numbers.foldRight(1)(_ * _)
  val reversed = numbers.foldLeft(List.empty[Int])((acc, x) => x :: acc)

  println(s"Sum: $sum")
  println(s"Product: $product")
  println(s"Reversed: $reversed")

  // Scan operations
  val runningSum = numbers.scanLeft(0)(_ + _)
  val runningProduct = numbers.scanRight(1)(_ * _)

  println(s"Running sum: $runningSum")
  println(s"Running product: $runningProduct")

  // Complex fold example - word count
  val text = "hello world hello scala world programming"
  val words = text.split("\s+").toList

  val wordCount = words.foldLeft(Map.empty[String, Int]) { (acc, word) =>
    acc + (word -> (acc.getOrElse(word, 0) + 1))
  }

  println(s"Word count: $wordCount")

  // Using fold to group by length
  val wordsByLength = words.groupBy(_.length)
  println(s"Words by length: $wordsByLength")
}

// 8. Monads and monadic operations
object MonadExamples extends App {
  // List monad
  val lists = List(1, 2, 3).map(i => List(i, i * 2)).flatten
  val listsComprehension = for {
    i <- List(1, 2, 3)
    j <- List(i, i * 2)
  } yield j

  println(s"Flattened lists: $lists")
  println(s"For comprehension: $listsComprehension")

  // Option monad
  val users = Map(
    1 -> "Alice",
    2 -> "Bob",
    3 -> "Charlie"
  )

  val roles = Map(
    "Alice" -> "Admin",
    "Bob" -> "User",
    "Charlie" -> "Moderator"
  )

  def getUserRole(userId: Int): Option[String] = {
    for {
      name <- users.get(userId)
      role <- roles.get(name)
    } yield role
  }

  println(s"Role for user 1: ${getUserRole(1)}")
  println(s"Role for user 4: ${getUserRole(4)}")

  // Custom monadic type
  case class Maybe[A](value: Option[A]) {
    def flatMap[B](f: A => Maybe[B]): Maybe[B] = {
      Maybe(value.flatMap(a => f(a).value))
    }

    def map[B](f: A => B): Maybe[B] = {
      Maybe(value.map(f))
    }

    def filter(p: A => Boolean): Maybe[A] = {
      Maybe(value.filter(p))
    }

    def getOrElse(default: A): A = value.getOrElse(default)
  }

  object Maybe {
    def apply[A](value: A): Maybe[A] = Maybe(Some(value))
    def empty[A]: Maybe[A] = Maybe(None)
  }

  def safeDivide(a: Int, b: Int): Maybe[Int] = {
    if (b == 0) Maybe.empty[Int]
    else Maybe(a / b)
  }

  val result = for {
    x <- safeDivide(10, 2)
    y <- safeDivide(x, 5)
  } yield y

  println(s"Safe division result: ${result.getOrElse(-1)}")
}

💻 Scala 大数据与 Apache Spark scala

🔴 complex ⭐⭐⭐⭐⭐

使用 Apache Spark 和 Scala 进行大数据处理的示例

⏱️ 45 min 🏷️ scala, spark, big data, analytics, distributed computing
Prerequisites: Scala basics, Understanding of distributed computing, Basic data analytics concepts
// Scala for Big Data with Apache Spark Examples

// Note: These examples require Apache Spark dependencies
// Add to build.sbt:
// libraryDependencies += "org.apache.spark" %% "spark-core" % "3.4.0"
// libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.4.0"

import org.apache.spark.sql.{SparkSession, DataFrame, Row}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.rdd.RDD
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator

object SparkSetup {
  // Initialize Spark Session
  def createSparkSession(appName: String): SparkSession = {
    SparkSession.builder()
      .appName(appName)
      .master("local[*]")  // Use all available cores locally
      .config("spark.sql.adaptive.enabled", "true")
      .config("spark.sql.adaptive.coalescePartitions.enabled", "true")
      .getOrCreate()
  }
}

// 1. Basic DataFrame operations
object DataFrameBasics {
  import SparkSetup._

  def basicDataFrameOperations(spark: SparkSession): Unit = {
    import spark.implicits._

    println("=== Basic DataFrame Operations ===")

    // Create a DataFrame from a sequence
    val data = Seq(
      ("Alice", 34, "Engineering"),
      ("Bob", 45, "Marketing"),
      ("Charlie", 29, "Engineering"),
      ("Diana", 38, "Sales"),
      ("Eve", 31, "Marketing")
    )

    val df = data.toDF("name", "age", "department")

    println("Original DataFrame:")
    df.show()

    // Basic transformations
    println("With uppercase names:")
    df.withColumn("name_upper", upper(col("name"))).show()

    println("Filtered by age > 30:")
    df.filter(col("age") > 30).show()

    println("Grouped by department:")
    df.groupBy("department")
      .agg(
        count("*").as("count"),
        avg("age").as("avg_age"),
        max("age").as("max_age"),
        min("age").as("min_age")
      )
      .show()

    // Sort operations
    println("Sorted by age descending:")
    df.orderBy(desc("age")).show()

    // SQL operations
    df.createOrReplaceTempView("employees")
    println("SQL query results:")
    spark.sql("SELECT department, COUNT(*) as count FROM employees GROUP BY department").show()
  }
}

// 2. RDD operations
object RDDBasics {
  import SparkSetup._

  def basicRDOperations(spark: SparkSession): Unit = {
    println("=== Basic RDD Operations ===")

    // Create RDD from collection
    val numbers = spark.sparkContext.parallelize(1 to 100)

    // Basic transformations
    val squares = numbers.map(x => x * x)
    val evenSquares = squares.filter(_ % 2 == 0)
    val sumOfEvenSquares = evenSquares.reduce(_ + _)

    println(s"Sum of even squares of 1-100: $sumOfEvenSquares")

    // Word count example
    val text = spark.sparkContext.parallelize(Seq(
      "hello world hello spark",
      "spark is fast",
      "hello scala world",
      "big data processing"
    ))

    val wordCounts = text
      .flatMap(line => line.split("\s+"))
      .map(word => (word.toLowerCase, 1))
      .reduceByKey(_ + _)
      .sortBy(_._2, ascending = false)

    println("Word counts:")
    wordCounts.collect().foreach { case (word, count) =>
      println(s"$word: $count")
    }

    // Group by key example
    val data = spark.sparkContext.parallelize(Seq(
      ("department1", "Alice"),
      ("department1", "Bob"),
      ("department2", "Charlie"),
      ("department1", "Diana"),
      ("department2", "Eve")
    ))

    val groupedData = data.groupByKey()

    println("Grouped by department:")
    groupedData.collect().foreach { case (dept, employees) =>
      println(s"$dept: ${employees.mkString(", ")}")
    }

    // Custom partitioning
    val partitionedData = numbers.repartition(4)
    println(s"Number of partitions: ${partitionedData.getNumPartitions}")
  }
}

// 3. Advanced DataFrame operations with complex data types
object AdvancedDataFrames {
  import SparkSetup._

  def complexDataOperations(spark: SparkSession): Unit = {
    import spark.implicits._

    println("=== Complex DataFrame Operations ===")

    // Create schema
    val schema = StructType(Array(
      StructField("id", IntegerType, nullable = false),
      StructField("name", StringType, nullable = false),
      StructField("tags", ArrayType(StringType), nullable = true),
      StructField("properties", MapType(StringType, StringType), nullable = true),
      StructField("created_at", TimestampType, nullable = false)
    ))

    // Create data with complex types
    val data = Seq(
      Row(1, "Product1", Array("electronics", "premium"),
          Map("brand" -> "BrandA", "category" -> "phones"),
          java.sql.Timestamp.valueOf("2023-01-15 10:30:00")),
      Row(2, "Product2", Array("books", "fiction"),
          Map("author" -> "AuthorX", "genre" -> "mystery"),
          java.sql.Timestamp.valueOf("2023-02-20 14:15:00")),
      Row(3, "Product3", Array("electronics", "budget"),
          Map("brand" -> "BrandB", "category" -> "tablets"),
          java.sql.Timestamp.valueOf("2023-03-10 09:45:00"))
    )

    val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)

    println("Complex DataFrame:")
    df.show(truncate = false)

    // Working with arrays
    println("Exploded tags:")
    df.select("id", "name", explode(col("tags")).as("tag")).show()

    // Working with maps
    println("Properties as separate columns:")
    df.select(
      "id", "name",
      col("properties")("brand").as("brand"),
      col("properties")("category").as("category")
    ).show()

    // Array operations
    println("Products with 'electronics' tag:")
    df.filter(array_contains(col("tags"), "electronics")).show()

    // JSON operations
    df.select(
      "id", "name",
      to_json(col("properties")).as("properties_json")
    ).show(false)

    // Window functions
    import org.apache.spark.sql.expressions.Window

    val windowSpec = Window.partitionBy("id").orderBy(desc("created_at"))

    // Add row numbers (useful for ranking)
    val withRowNum = df.withColumn("row_num", row_number().over(windowSpec))
    println("With row numbers:")
    withRowNum.show()

    // Date/time operations
    df.select(
      "id", "name",
      date_format(col("created_at"), "yyyy-MM-dd").as("date"),
      date_format(col("created_at"), "HH:mm:ss").as("time"),
      datediff(current_date(), col("created_at")).as("days_ago")
    ).show()
  }
}

// 4. Data aggregation and analytics
object DataAggregation {
  import SparkSetup._

  def salesAnalytics(spark: SparkSession): Unit = {
    import spark.implicits._

    println("=== Sales Analytics ===")

    // Sample sales data
    val salesData = Seq(
      ("2023-01-01", "ProductA", "Electronics", 1200.0, 10, "North"),
      ("2023-01-01", "ProductB", "Books", 25.0, 50, "South"),
      ("2023-01-02", "ProductA", "Electronics", 1150.0, 8, "East"),
      ("2023-01-02", "ProductC", "Clothing", 75.0, 20, "West"),
      ("2023-01-03", "ProductB", "Books", 30.0, 15, "North"),
      ("2023-01-03", "ProductC", "Clothing", 80.0, 12, "South"),
      ("2023-01-04", "ProductA", "Electronics", 1250.0, 15, "West"),
      ("2023-01-04", "ProductD", "Electronics", 800.0, 25, "East"),
      ("2023-01-05", "ProductB", "Books", 28.0, 30, "North")
    )

    val salesDF = salesData.toDF(
      "date", "product", "category", "price", "quantity", "region"
    )

    // Add calculated columns
    val enrichedSalesDF = salesDF
      .withColumn("revenue", col("price") * col("quantity"))
      .withColumn("year", year(to_date(col("date"))))
      .withColumn("month", month(to_date(col("date"))))
      .withColumn("day_of_week", date_format(to_date(col("date")), "E"))

    println("Enriched sales data:")
    enrichedSalesDF.show()

    // Daily sales summary
    println("Daily sales summary:")
    enrichedSalesDF
      .groupBy("date")
      .agg(
        sum("revenue").as("total_revenue"),
        sum("quantity").as("total_quantity"),
        count("*").as("number_of_transactions"),
        avg("price").as("average_price")
      )
      .orderBy("date")
      .show()

    // Product performance
    println("Product performance:")
    enrichedSalesDF
      .groupBy("product", "category")
      .agg(
        sum("revenue").as("total_revenue"),
        sum("quantity").as("total_quantity"),
        avg("price").as("average_price"),
        countDistinct("date").as("days_sold")
      )
      .orderBy(desc("total_revenue"))
      .show()

    // Regional analysis
    println("Regional analysis:")
    enrichedSalesDF
      .groupBy("region")
      .agg(
        sum("revenue").as("total_revenue"),
        sum("quantity").as("total_quantity"),
        countDistinct("product").as("unique_products"),
        avg("revenue").as("avg_transaction_value")
      )
      .orderBy(desc("total_revenue"))
      .show()

    // Pivot table: sales by region and category
    println("Sales pivot by region and category:")
    enrichedSalesDF
      .groupBy("region")
      .pivot("category")
      .agg(sum("revenue"))
      .na.fill(0)
      .show()

    // Rolling window calculations
    import org.apache.spark.sql.expressions.Window

    val windowSpec = Window
      .partitionBy("product")
      .orderBy("date")
      .rowsBetween(Window.unboundedPreceding, Window.currentRow)

    println("Running totals per product:")
    enrichedSalesDF
      .withColumn("running_revenue", sum("revenue").over(windowSpec))
      .withColumn("running_quantity", sum("quantity").over(windowSpec))
      .select("product", "date", "revenue", "running_revenue", "quantity", "running_quantity")
      .orderBy("product", "date")
      .show()
  }
}

// 5. Machine Learning with Spark ML
object SparkML {
  import SparkSetup._

  def simpleClassification(spark: SparkSession): Unit = {
    import spark.implicits._

    println("=== Simple Machine Learning Example ===")

    // Create sample training data
    val trainingData = Seq(
      (1.0, "A", 10.0, 20.0),
      (0.0, "B", 5.0, 15.0),
      (1.0, "A", 12.0, 22.0),
      (0.0, "B", 6.0, 14.0),
      (1.0, "A", 11.0, 21.0),
      (0.0, "B", 4.0, 13.0),
      (1.0, "A", 13.0, 25.0),
      (0.0, "B", 7.0, 16.0),
      (1.0, "A", 9.0, 19.0),
      (0.0, "B", 3.0, 12.0)
    )

    val df = trainingData.toDF("label", "category", "feature1", "feature2")

    println("Training data:")
    df.show()

    // Convert string column to numeric
    val indexer = new StringIndexer()
      .setInputCol("category")
      .setOutputCol("categoryIndex")
      .fit(df)

    val indexedDF = indexer.transform(df)

    // Assemble features
    val assembler = new VectorAssembler()
      .setInputCols(Array("categoryIndex", "feature1", "feature2"))
      .setOutputCol("features")

    val assembledDF = assembler.transform(indexedDF)

    println("Data with assembled features:")
    assembledDF.select("label", "features").show(false)

    // Split data
    val Array(trainingDataSplit, testDataSplit) = assembledDF.randomSplit(Array(0.8, 0.2), seed = 12345)

    // Train logistic regression model
    val lr = new LogisticRegression()
      .setLabelCol("label")
      .setFeaturesCol("features")
      .setMaxIter(10)
      .setRegParam(0.01)

    val lrModel = lr.fit(trainingDataSplit)

    println("Model coefficients:")
    println(s"Coefficients: ${lrModel.coefficients} Intercept: ${lrModel.intercept}")

    // Make predictions
    val predictions = lrModel.transform(testDataSplit)

    println("Predictions:")
    predictions.select("label", "features", "probability", "prediction").show(false)

    // Evaluate model
    val evaluator = new BinaryClassificationEvaluator()
      .setLabelCol("label")
      .setRawPredictionCol("rawPrediction")
      .setMetricName("areaUnderROC")

    val auc = evaluator.evaluate(predictions)
    println(s"Area Under ROC: $auc")
  }
}

// 6. Performance optimization and best practices
object Optimization {
  import SparkSetup._

  def optimizationTechniques(spark: SparkSession): Unit = {
    import spark.implicits._

    println("=== Spark Optimization Techniques ===")

    // Create large dataset for optimization examples
    val largeData = (1 to 100000).map { i =>
      (i, s"item_$i", i % 100, i.toDouble / 100.0, i % 5 == 0)
    }

    val largeDF = largeData.toDF("id", "name", "category", "value", "is_premium")

    // Cache commonly used DataFrame
    println("Caching example:")
    val startCache = System.currentTimeMillis()

    largeDF.cache()  // Cache in memory

    // First action - triggers caching
    val count1 = largeDF.count()
    println(s"First count: $count1, Time: ${System.currentTimeMillis() - startCache}ms")

    // Second action - uses cached data
    val startCached = System.currentTimeMillis()
    val count2 = largeDF.filter(col("category") === 50).count()
    println(s"Second count: $count2, Time: ${System.currentTimeMillis() - startCached}ms")

    // Partitioning example
    println("\nPartitioning example:")
    val partitionedDF = largeDF.repartition(col("category"))
    println(s"Original partitions: ${largeDF.rdd.getNumPartitions}")
    println(s"Repartitioned partitions: ${partitionedDF.rdd.getNumPartitions}")

    // Coalesce for reducing partitions
    val coalescedDF = largeDF.coalesce(4)
    println(s"Coalesced partitions: ${coalescedDF.rdd.getNumPartitions}")

    // Broadcast join example
    println("\nBroadcast join example:")
    val smallLookup = Seq((0, "Basic"), (1, "Premium"), (2, "VIP"))
      .toDF("is_premium", "tier")

    // Use broadcast for small lookup table
    val joinedDF = largeDF.join(broadcast(smallLookup), "is_premium")

    println("Broadcast join result:")
    joinedDF.select("id", "name", "tier").show(10)

    // Explain plans
    println("\nQuery plan explanation:")
    joinedDF.explain(true)

    // Clean up
    largeDF.unpersist()
  }
}

// Main application to run all examples
object SparkExamples {
  def main(args: Array[String]): Unit = {
    val spark = SparkSetup.createSparkSession("Scala Spark Examples")

    try {
      // Run examples
      DataFrameBasics.basicDataFrameOperations(spark)
      println()

      RDDBasics.basicRDOperations(spark)
      println()

      AdvancedDataFrames.complexDataOperations(spark)
      println()

      DataAggregation.salesAnalytics(spark)
      println()

      SparkML.simpleClassification(spark)
      println()

      Optimization.optimizationTechniques(spark)

    } catch {
      case e: Exception =>
        println(s"Error: ${e.getMessage}")
        e.printStackTrace()
    } finally {
      spark.stop()
    }
  }
}