Scala Language Samples

Essential Scala programming examples and functional programming concepts

Key Facts

Category
Programming Languages
Items
3
Format Families
sample

Sample Overview

Essential Scala programming examples and functional programming concepts This sample set belongs to Programming Languages and can be used to test related workflows inside Elysia Tools.

💻 Scala Hello World scala

🟢 simple ⭐⭐

Basic Scala Hello World program and fundamental syntax examples

⏱️ 15 min 🏷️ scala, programming, beginner, functional, jvm
Prerequisites: Basic programming concepts, Understanding of JVM ecosystem
// Scala Hello World Examples

// 1. Basic Hello World
object HelloWorld {
  def main(args: Array[String]): Unit = {
    println("Hello, World!")
  }
}

// 2. Hello World with App trait
object HelloWorldApp extends App {
  println("Hello, World from App!")
}

// 3. Hello World with function
def sayHello(): String = {
  "Hello, World!"
}

println(sayHello())

// 4. Hello World with function parameters
def greet(name: String): String = {
  s"Hello, $name!"
}

println(greet("World"))
println(greet("Scala"))

// 5. Hello World with class
class Greeter(message: String = "Hello, World!") {
  def greet(): String = message
}

val greeter = new Greeter()
println(greeter.greet())

val customGreeter = new Greeter("Hello from custom class!")
println(customGreeter.greet())

// 6. Hello World with case class
case class Person(name: String, greeting: String = "Hello") {
  def sayHello(): String = s"$greeting, $name!"
}

val person = Person("World")
println(person.sayHello())

val customPerson = Person("Scala", "Greetings")
println(customPerson.sayHello())

// 7. Hello World multiple times
object MultipleHellos extends App {
  for (i <- 1 to 5) {
    println(s"Hello, World! $i")
  }
}

// 8. Hello World with list
object ListHellos extends App {
  val greetings = List("Hello", "Bonjour", "Hola", "Ciao", "こんにちは")

  greetings.foreach { greeting =>
    println(s"$greeting, World!")
  }
}

// 9. Hello World with map
object MapHellos extends App {
  val greetings = Map(
    "en" -> "Hello",
    "es" -> "Hola",
    "fr" -> "Bonjour",
    "de" -> "Hallo",
    "ja" -> "こんにちは"
  )

  greetings.foreach { case (lang, greeting) =>
    println(s"$greeting, World! ($lang)")
  }
}

// 10. Hello World with pattern matching
object PatternMatchingHello extends App {
  def greetByTime(hour: Int): String = {
    hour match {
      case h if h < 12 => "Good morning"
      case h if h < 18 => "Good afternoon"
      case _ => "Good evening"
    }
  }

  val currentHour = 14
  println(s"${greetByTime(currentHour)}, World!")
}

// Basic data types examples
object DataTypes extends App {
  // Numbers
  val integer: Int = 42
  val long: Long = 1000000L
  val float: Float = 3.14f
  val double: Double = 2.71828

  // String
  val text: String = "Scala"

  // Boolean
  val isAwesome: Boolean = true

  // Collections
  val numbers: List[Int] = List(1, 2, 3, 4, 5)
  val fruits: List[String] = List("apple", "banana", "cherry")
  val scores: Map[String, Int] = Map(
    "math" -> 95,
    "science" -> 88,
    "english" -> 92
  )

  println(s"Integer: $integer, Type: ${integer.getClass.getSimpleName}")
  println(s"Double: $double, Type: ${double.getClass.getSimpleName}")
  println(s"String: $text, Type: ${text.getClass.getSimpleName}")
  println(s"Boolean: $isAwesome, Type: ${isAwesome.getClass.getSimpleName}")
  println(s"List: $numbers, Type: ${numbers.getClass.getSimpleName}")
  println(s"Map: $scores, Type: ${scores.getClass.getSimpleName}")
}

// Control flow examples
object ControlFlow extends App {
  val age = 18

  // If-else expression
  val message = if (age >= 18) {
    "You are an adult"
  } else {
    "You are a minor"
  }
  println(message)

  // Match expression (Scala's switch)
  val grade = 'A'
  val gradeMessage = grade match {
    case 'A' => "Excellent!"
    case 'B' => "Good!"
    case 'C' => "Fair"
    case _   => "Needs improvement"
  }
  println(gradeMessage)

  // For comprehension
  val fruits = List("apple", "banana", "cherry")
  for (fruit <- fruits) {
    println(s"I like $fruit")
  }

  // While loop
  var count = 0
  while (count < 3) {
    println(s"Count: $count")
    count += 1
  }
}

// String interpolation and operations
object StringOperations extends App {
  val name = "Scala"
  val version = 3
  val rating = 4.5

  println(s"Language: $name")
  println(s"Version: $version")
  println(s"Rating: $rating")
  println(s"Formatted: Language $name version $version has rating $rating")

  // f-interpolator for formatted strings
  println(f"Pi: ${math.Pi}%.2f")
  println(f"Number: ${42}%05d")

  // String methods
  val text = "Hello Scala World"
  println(s"Original: $text")
  println(s"Upper: ${text.toUpperCase}")
  println(s"Lower: ${text.toLowerCase}")
  println(s"Words: ${text.split(" ").mkString("[", "][", "]")}")
  println(s"Length: ${text.length}")
}

// Option type for null safety
object OptionExamples extends App {
  def findUser(id: Int): Option[String] = {
    if (id > 0) Some(s"User$id") else None
  }

  val user1 = findUser(1)
  val user2 = findUser(-1)

  // Pattern matching on Option
  user1 match {
    case Some(name) => println(s"Found user: $name")
    case None       => println("User not found")
  }

  // UsinggetOrElse
  println(s"User2 name: ${user2.getOrElse("Unknown")}")

  // Using map and flatMap
  val userNameLength = user1.map(_.length)
  println(s"User1 name length: $userNameLength")
}

// Tuple examples
object TupleExamples extends App {
  // Creating tuples
  val person: Tuple2[String, Int] = ("John", 25)
  val coordinates = (10.5, 20.3)
  val triple = ("Alice", 30, "Engineer")

  // Accessing tuple elements
  println(s"Name: ${person._1}, Age: ${person._2}")
  println(s"X: ${coordinates._1}, Y: ${coordinates._2}")

  // Pattern matching on tuples
  val result = person match {
    case (name, age) if age < 18 => s"$name is a minor"
    case (name, age)             => s"$name is ${age} years old"
  }
  println(result)
}

// Basic functions as values
object FunctionValues extends App {
  // Function as a value
  val add: (Int, Int) => Int = (a, b) => a + b
  val multiply: (Int, Int) => Int = (a, b) => a * b

  println(s"5 + 3 = ${add(5, 3)}")
  println(s"5 * 3 = ${multiply(5, 3)}")

  // Higher-order function
  def applyOperation(a: Int, b: Int, operation: (Int, Int) => Int): Int = {
    operation(a, b)
  }

  println(s"Apply addition: ${applyOperation(10, 5, add)}")
  println(s"Apply multiplication: ${applyOperation(10, 5, multiply)}")

  // Function composition
  val addOne: Int => Int = _ + 1
  val multiplyByTwo: Int => Int = _ * 2
  val composedFunction = addOne.andThen(multiplyByTwo)

  println(s"Composed function (5): ${composedFunction(5)}")
}

💻 Scala Functional Programming scala

🟡 intermediate ⭐⭐⭐⭐

Advanced functional programming concepts: immutability, higher-order functions, and type system

⏱️ 30 min 🏷️ scala, functional, monads, immutability, type system
Prerequisites: Scala basics, Understanding of functional programming concepts
// Scala Functional Programming Examples

import scala.util.{Try, Success, Failure}

// 1. Immutability and immutable collections
object ImmutabilityExamples extends App {
  // Immutable list operations
  val numbers = List(1, 2, 3, 4, 5)
  val doubledNumbers = numbers.map(_ * 2)
  val evenNumbers = numbers.filter(_ % 2 == 0)
  val sum = numbers.foldLeft(0)(_ + _)

  println(s"Original: $numbers")
  println(s"Doubled: $doubledNumbers")
  println(s"Even numbers: $evenNumbers")
  println(s"Sum: $sum")

  // Immutable Map operations
  val scores = Map("Alice" -> 95, "Bob" -> 87, "Charlie" -> 92)
  val updatedScores = scores + ("David" -> 88)
  val charlieScore = scores.get("Charlie")

  println(s"Scores: $scores")
  println(s"Updated scores: $updatedScores")
  println(s"Charlie's score: $charlieScore")
}

// 2. Higher-order functions
object HigherOrderFunctions extends App {
  // Function that takes a function as parameter
  def processList[A, B](list: List[A])(operation: A => B): List[B] = {
    list.map(operation)
  }

  def processListWithPredicate[A](
    list: List[A]
  )(predicate: A => Boolean)(operation: A => A): List[A] = {
    list.filter(predicate).map(operation)
  }

  val numbers = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

  // Using processList
  val strings = processList(numbers)(_.toString)
  println(s"Numbers as strings: $strings")

  // Using processListWithPredicate
  val doubledEvens = processListWithPredicate(numbers)(_ % 2 == 0)(_ * 2)
  println(s"Doubled even numbers: $doubledEvens")

  // Function that returns a function (currying)
  def multiplier(factor: Int): Int => Int = {
    (x: Int) => x * factor
  }

  val doubler = multiplier(2)
  val tripler = multiplier(3)

  println(s"Doubler(5): ${doubler(5)}")
  println(s"Tripler(5): ${tripler(5)}")

  // Curried function
  def add(a: Int)(b: Int): Int = a + b

  val addFive = add(5) _
  println(s"Add 5 to 10: ${addFive(10)}")

  // Function composition
  val addOne: Int => Int = _ + 1
  val multiplyByTwo: Int => Int = _ * 2
  val toString: Int => String = _.toString

  val composed1 = addOne.andThen(multiplyByTwo)
  val composed2 = addOne.compose(multiplyByTwo)
  val composed3 = toString.compose(addOne).compose(multiplyByTwo)

  println(s"Composed1 (5 -> add -> multiply): ${composed1(5)}")
  println(s"Composed2 (5 -> multiply -> add): ${composed2(5)}")
  println(s"Composed3 (5 -> multiply -> add -> string): ${composed3(5)}")
}

// 3. Pattern matching advanced
object AdvancedPatternMatching extends App {
  // Matching on case classes
  sealed trait Shape
  case class Circle(radius: Double) extends Shape
  case class Rectangle(width: Double, height: Double) extends Shape
  case class Triangle(base: Double, height: Double) extends Shape

  def calculateArea(shape: Shape): Double = shape match {
    case Circle(radius) => math.Pi * radius * radius
    case Rectangle(width, height) => width * height
    case Triangle(base, height) => (base * height) / 2
  }

  val shapes = List(
    Circle(5.0),
    Rectangle(4.0, 6.0),
    Triangle(3.0, 4.0)
  )

  shapes.foreach { shape =>
    println(s"Shape: $shape, Area: ${calculateArea(shape)}")
  }

  // Pattern matching with guards
  def describeNumber(num: Int): String = num match {
    case n if n < 0      => s"$n is negative"
    case 0               => "Zero"
    case n if n < 10     => s"$n is a single digit"
    case n if n % 2 == 0 => s"$n is even"
    case n               => s"$n is odd"
  }

  val numbers = List(-5, 0, 7, 8, 15)
  numbers.foreach(n => println(s"$n: ${describeNumber(n)}"))

  // Matching on collections
  def analyzeList(list: List[Int]): String = list match {
    case Nil                    => "Empty list"
    case head :: Nil            => s"Single element: $head"
    case first :: second :: Nil => s"Two elements: $first, $second"
    case first :: rest          => s"First: $first, rest: ${rest.size} elements"
  }

  val lists = List(Nil, List(1), List(1, 2), List(1, 2, 3, 4))
  lists.foreach(lst => println(s"$lst -> ${analyzeList(lst)}"))
}

// 4. Either and Try for error handling
object ErrorHandling extends App {
  // Either for handling errors that can be recovered
  def divideEither(a: Int, b: Int): Either[String, Double] = {
    if (b == 0) Left("Division by zero")
    else Right(a.toDouble / b)
  }

  def sqrtEither(x: Double): Either[String, Double] = {
    if (x < 0) Left("Cannot calculate square root of negative number")
    else Right(math.sqrt(x))
  }

  // Composing Either operations
  def divideAndSqrt(a: Int, b: Int, x: Double): Either[String, Double] = {
    for {
      result <- divideEither(a, b)
      sqrtResult <- sqrtEither(result)
    } yield sqrtResult
  }

  val result1 = divideAndSqrt(16, 2, 0)  // Success
  val result2 = divideAndSqrt(16, 0, 0)  // Division by zero
  val result3 = divideAndSqrt(-16, 2, 0) // Square root of negative

  println(s"Result1: ${result1.getOrElse("Error")}")
  println(s"Result2: ${result2.left.getOrElse("Success")}")
  println(s"Result3: ${result3.left.getOrElse("Success")}")

  // Try for exception handling
  def divideTry(a: Int, b: Int): Try[Double] = {
    Try(a.toDouble / b)
  }

  def readFileTry(filename: String): Try[String] = {
    Try {
      val source = scala.io.Source.fromFile(filename)
      val content = source.mkString
      source.close()
      content
    }
  }

  // Working with Try
  val tryResult = divideTry(10, 2)
  val tryFailure = divideTry(10, 0)

  tryResult match {
    case Success(value) => println(s"Success: $value")
    case Failure(exception) => println(s"Failure: ${exception.getMessage}")
  }

  // Transforming Try
  val transformedTry = tryResult.map(_ * 2).filter(_ > 5).getOrElse(0.0)
  println(s"Transformed result: $transformedTry")
}

// 5. Lazy evaluation
object LazyEvaluation extends App {
  // Lazy values
  lazy val expensiveComputation: Int = {
    println("Computing expensive value...")
    Thread.sleep(1000)  // Simulate expensive operation
    42
  }

  println("Before using lazy value")
  println(s"Lazy value: $expensiveComputation")
  println(s"Lazy value again: $expensiveComputation")

  // Infinite streams using lazy evaluation
  def naturalNumbers: Stream[Int] = 1 #:: naturalNumbers.map(_ + 1)

  val firstTenNumbers = naturalNumbers.take(10).toList
  println(s"First 10 natural numbers: $firstTenNumbers")

  def fibonacci: Stream[Int] = {
    def fib(a: Int, b: Int): Stream[Int] = a #:: fib(b, a + b)
    fib(0, 1)
  }

  val firstTenFibonacci = fibonacci.take(10).toList
  println(s"First 10 Fibonacci numbers: $firstTenFibonacci")

  // Lazy lists (LazyList in Scala 2.13+)
  lazy val lazyFactorials: LazyList[BigInt] = {
    def fact(n: Int): LazyList[BigInt] =
      if (n == 0) LazyList(1)
      else fact(n - 1).map(_ * BigInt(n))
    fact(0).zipWithIndex.map(_._1)
  }

  val firstFiveFactorials = lazyFactorials.take(5).toList
  println(s"First 5 factorials: $firstFiveFactorials")
}

// 6. Custom types and type classes
object CustomTypes extends App {
  // Custom algebraic data type
  sealed trait Weather
  case object Sunny extends Weather
  case object Cloudy extends Weather
  case object Rainy extends Weather
  case class Temperature(degrees: Double, scale: String)

  case class DayForecast(
    day: String,
    weather: Weather,
    temperature: Option[Temperature]
  )

  // Weather operations
  def getWeatherDescription(forecast: DayForecast): String = {
    val weatherDesc = forecast.weather match {
      case Sunny => "sunny"
      case Cloudy => "cloudy"
      case Rainy => "rainy"
    }

    val tempDesc = forecast.temperature match {
      case Some(Temperature(deg, "C")) => s"${deg}°C"
      case Some(Temperature(deg, "F")) => s"${deg}°F"
      case Some(Temperature(deg, _))   => s"${deg}°"
      case None                        => "temperature unknown"
    }

    s"${forecast.day} will be $weatherDesc with $tempDesc"
  }

  val forecasts = List(
    DayForecast("Monday", Sunny, Some(Temperature(25.0, "C"))),
    DayForecast("Tuesday", Cloudy, Some(Temperature(18.0, "C"))),
    DayForecast("Wednesday", Rainy, None)
  )

  forecasts.foreach(forecast => println(getWeatherDescription(forecast)))

  // Type class pattern
  trait Show[A] {
    def show(value: A): String
  }

  object Show {
    def apply[A](implicit show: Show[A]): Show[A] = show

    implicit val intShow: Show[Int] = new Show[Int] {
      def show(value: Int): String = s"Int: $value"
    }

    implicit val stringShow: Show[String] = new Show[String] {
      def show(value: String): String = s"String: '$value'"
    }

    implicit def optionShow[A](implicit showA: Show[A]): Show[Option[A]] =
      new Show[Option[A]] {
        def show(value: Option[A]): String = value match {
          case Some(a) => s"Some(${showA.show(a)})"
          case None    => "None"
        }
      }
  }

  def prettyPrint[A](value: A)(implicit show: Show[A]): Unit = {
    println(show.show(value))
  }

  prettyPrint(42)
  prettyPrint("Hello")
  prettyPrint(Some(123))
  prettyPrint(Option.empty[String])
}

// 7. Fold and scan operations
object FoldOperations extends App {
  val numbers = List(1, 2, 3, 4, 5)

  // Fold operations
  val sum = numbers.foldLeft(0)(_ + _)
  val product = numbers.foldRight(1)(_ * _)
  val reversed = numbers.foldLeft(List.empty[Int])((acc, x) => x :: acc)

  println(s"Sum: $sum")
  println(s"Product: $product")
  println(s"Reversed: $reversed")

  // Scan operations
  val runningSum = numbers.scanLeft(0)(_ + _)
  val runningProduct = numbers.scanRight(1)(_ * _)

  println(s"Running sum: $runningSum")
  println(s"Running product: $runningProduct")

  // Complex fold example - word count
  val text = "hello world hello scala world programming"
  val words = text.split("\s+").toList

  val wordCount = words.foldLeft(Map.empty[String, Int]) { (acc, word) =>
    acc + (word -> (acc.getOrElse(word, 0) + 1))
  }

  println(s"Word count: $wordCount")

  // Using fold to group by length
  val wordsByLength = words.groupBy(_.length)
  println(s"Words by length: $wordsByLength")
}

// 8. Monads and monadic operations
object MonadExamples extends App {
  // List monad
  val lists = List(1, 2, 3).map(i => List(i, i * 2)).flatten
  val listsComprehension = for {
    i <- List(1, 2, 3)
    j <- List(i, i * 2)
  } yield j

  println(s"Flattened lists: $lists")
  println(s"For comprehension: $listsComprehension")

  // Option monad
  val users = Map(
    1 -> "Alice",
    2 -> "Bob",
    3 -> "Charlie"
  )

  val roles = Map(
    "Alice" -> "Admin",
    "Bob" -> "User",
    "Charlie" -> "Moderator"
  )

  def getUserRole(userId: Int): Option[String] = {
    for {
      name <- users.get(userId)
      role <- roles.get(name)
    } yield role
  }

  println(s"Role for user 1: ${getUserRole(1)}")
  println(s"Role for user 4: ${getUserRole(4)}")

  // Custom monadic type
  case class Maybe[A](value: Option[A]) {
    def flatMap[B](f: A => Maybe[B]): Maybe[B] = {
      Maybe(value.flatMap(a => f(a).value))
    }

    def map[B](f: A => B): Maybe[B] = {
      Maybe(value.map(f))
    }

    def filter(p: A => Boolean): Maybe[A] = {
      Maybe(value.filter(p))
    }

    def getOrElse(default: A): A = value.getOrElse(default)
  }

  object Maybe {
    def apply[A](value: A): Maybe[A] = Maybe(Some(value))
    def empty[A]: Maybe[A] = Maybe(None)
  }

  def safeDivide(a: Int, b: Int): Maybe[Int] = {
    if (b == 0) Maybe.empty[Int]
    else Maybe(a / b)
  }

  val result = for {
    x <- safeDivide(10, 2)
    y <- safeDivide(x, 5)
  } yield y

  println(s"Safe division result: ${result.getOrElse(-1)}")
}

💻 Scala for Big Data with Apache Spark scala

🔴 complex ⭐⭐⭐⭐⭐

Big data processing examples using Apache Spark and Scala for data analytics

⏱️ 45 min 🏷️ scala, spark, big data, analytics, distributed computing
Prerequisites: Scala basics, Understanding of distributed computing, Basic data analytics concepts
// Scala for Big Data with Apache Spark Examples

// Note: These examples require Apache Spark dependencies
// Add to build.sbt:
// libraryDependencies += "org.apache.spark" %% "spark-core" % "3.4.0"
// libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.4.0"

import org.apache.spark.sql.{SparkSession, DataFrame, Row}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.rdd.RDD
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator

object SparkSetup {
  // Initialize Spark Session
  def createSparkSession(appName: String): SparkSession = {
    SparkSession.builder()
      .appName(appName)
      .master("local[*]")  // Use all available cores locally
      .config("spark.sql.adaptive.enabled", "true")
      .config("spark.sql.adaptive.coalescePartitions.enabled", "true")
      .getOrCreate()
  }
}

// 1. Basic DataFrame operations
object DataFrameBasics {
  import SparkSetup._

  def basicDataFrameOperations(spark: SparkSession): Unit = {
    import spark.implicits._

    println("=== Basic DataFrame Operations ===")

    // Create a DataFrame from a sequence
    val data = Seq(
      ("Alice", 34, "Engineering"),
      ("Bob", 45, "Marketing"),
      ("Charlie", 29, "Engineering"),
      ("Diana", 38, "Sales"),
      ("Eve", 31, "Marketing")
    )

    val df = data.toDF("name", "age", "department")

    println("Original DataFrame:")
    df.show()

    // Basic transformations
    println("With uppercase names:")
    df.withColumn("name_upper", upper(col("name"))).show()

    println("Filtered by age > 30:")
    df.filter(col("age") > 30).show()

    println("Grouped by department:")
    df.groupBy("department")
      .agg(
        count("*").as("count"),
        avg("age").as("avg_age"),
        max("age").as("max_age"),
        min("age").as("min_age")
      )
      .show()

    // Sort operations
    println("Sorted by age descending:")
    df.orderBy(desc("age")).show()

    // SQL operations
    df.createOrReplaceTempView("employees")
    println("SQL query results:")
    spark.sql("SELECT department, COUNT(*) as count FROM employees GROUP BY department").show()
  }
}

// 2. RDD operations
object RDDBasics {
  import SparkSetup._

  def basicRDOperations(spark: SparkSession): Unit = {
    println("=== Basic RDD Operations ===")

    // Create RDD from collection
    val numbers = spark.sparkContext.parallelize(1 to 100)

    // Basic transformations
    val squares = numbers.map(x => x * x)
    val evenSquares = squares.filter(_ % 2 == 0)
    val sumOfEvenSquares = evenSquares.reduce(_ + _)

    println(s"Sum of even squares of 1-100: $sumOfEvenSquares")

    // Word count example
    val text = spark.sparkContext.parallelize(Seq(
      "hello world hello spark",
      "spark is fast",
      "hello scala world",
      "big data processing"
    ))

    val wordCounts = text
      .flatMap(line => line.split("\s+"))
      .map(word => (word.toLowerCase, 1))
      .reduceByKey(_ + _)
      .sortBy(_._2, ascending = false)

    println("Word counts:")
    wordCounts.collect().foreach { case (word, count) =>
      println(s"$word: $count")
    }

    // Group by key example
    val data = spark.sparkContext.parallelize(Seq(
      ("department1", "Alice"),
      ("department1", "Bob"),
      ("department2", "Charlie"),
      ("department1", "Diana"),
      ("department2", "Eve")
    ))

    val groupedData = data.groupByKey()

    println("Grouped by department:")
    groupedData.collect().foreach { case (dept, employees) =>
      println(s"$dept: ${employees.mkString(", ")}")
    }

    // Custom partitioning
    val partitionedData = numbers.repartition(4)
    println(s"Number of partitions: ${partitionedData.getNumPartitions}")
  }
}

// 3. Advanced DataFrame operations with complex data types
object AdvancedDataFrames {
  import SparkSetup._

  def complexDataOperations(spark: SparkSession): Unit = {
    import spark.implicits._

    println("=== Complex DataFrame Operations ===")

    // Create schema
    val schema = StructType(Array(
      StructField("id", IntegerType, nullable = false),
      StructField("name", StringType, nullable = false),
      StructField("tags", ArrayType(StringType), nullable = true),
      StructField("properties", MapType(StringType, StringType), nullable = true),
      StructField("created_at", TimestampType, nullable = false)
    ))

    // Create data with complex types
    val data = Seq(
      Row(1, "Product1", Array("electronics", "premium"),
          Map("brand" -> "BrandA", "category" -> "phones"),
          java.sql.Timestamp.valueOf("2023-01-15 10:30:00")),
      Row(2, "Product2", Array("books", "fiction"),
          Map("author" -> "AuthorX", "genre" -> "mystery"),
          java.sql.Timestamp.valueOf("2023-02-20 14:15:00")),
      Row(3, "Product3", Array("electronics", "budget"),
          Map("brand" -> "BrandB", "category" -> "tablets"),
          java.sql.Timestamp.valueOf("2023-03-10 09:45:00"))
    )

    val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)

    println("Complex DataFrame:")
    df.show(truncate = false)

    // Working with arrays
    println("Exploded tags:")
    df.select("id", "name", explode(col("tags")).as("tag")).show()

    // Working with maps
    println("Properties as separate columns:")
    df.select(
      "id", "name",
      col("properties")("brand").as("brand"),
      col("properties")("category").as("category")
    ).show()

    // Array operations
    println("Products with 'electronics' tag:")
    df.filter(array_contains(col("tags"), "electronics")).show()

    // JSON operations
    df.select(
      "id", "name",
      to_json(col("properties")).as("properties_json")
    ).show(false)

    // Window functions
    import org.apache.spark.sql.expressions.Window

    val windowSpec = Window.partitionBy("id").orderBy(desc("created_at"))

    // Add row numbers (useful for ranking)
    val withRowNum = df.withColumn("row_num", row_number().over(windowSpec))
    println("With row numbers:")
    withRowNum.show()

    // Date/time operations
    df.select(
      "id", "name",
      date_format(col("created_at"), "yyyy-MM-dd").as("date"),
      date_format(col("created_at"), "HH:mm:ss").as("time"),
      datediff(current_date(), col("created_at")).as("days_ago")
    ).show()
  }
}

// 4. Data aggregation and analytics
object DataAggregation {
  import SparkSetup._

  def salesAnalytics(spark: SparkSession): Unit = {
    import spark.implicits._

    println("=== Sales Analytics ===")

    // Sample sales data
    val salesData = Seq(
      ("2023-01-01", "ProductA", "Electronics", 1200.0, 10, "North"),
      ("2023-01-01", "ProductB", "Books", 25.0, 50, "South"),
      ("2023-01-02", "ProductA", "Electronics", 1150.0, 8, "East"),
      ("2023-01-02", "ProductC", "Clothing", 75.0, 20, "West"),
      ("2023-01-03", "ProductB", "Books", 30.0, 15, "North"),
      ("2023-01-03", "ProductC", "Clothing", 80.0, 12, "South"),
      ("2023-01-04", "ProductA", "Electronics", 1250.0, 15, "West"),
      ("2023-01-04", "ProductD", "Electronics", 800.0, 25, "East"),
      ("2023-01-05", "ProductB", "Books", 28.0, 30, "North")
    )

    val salesDF = salesData.toDF(
      "date", "product", "category", "price", "quantity", "region"
    )

    // Add calculated columns
    val enrichedSalesDF = salesDF
      .withColumn("revenue", col("price") * col("quantity"))
      .withColumn("year", year(to_date(col("date"))))
      .withColumn("month", month(to_date(col("date"))))
      .withColumn("day_of_week", date_format(to_date(col("date")), "E"))

    println("Enriched sales data:")
    enrichedSalesDF.show()

    // Daily sales summary
    println("Daily sales summary:")
    enrichedSalesDF
      .groupBy("date")
      .agg(
        sum("revenue").as("total_revenue"),
        sum("quantity").as("total_quantity"),
        count("*").as("number_of_transactions"),
        avg("price").as("average_price")
      )
      .orderBy("date")
      .show()

    // Product performance
    println("Product performance:")
    enrichedSalesDF
      .groupBy("product", "category")
      .agg(
        sum("revenue").as("total_revenue"),
        sum("quantity").as("total_quantity"),
        avg("price").as("average_price"),
        countDistinct("date").as("days_sold")
      )
      .orderBy(desc("total_revenue"))
      .show()

    // Regional analysis
    println("Regional analysis:")
    enrichedSalesDF
      .groupBy("region")
      .agg(
        sum("revenue").as("total_revenue"),
        sum("quantity").as("total_quantity"),
        countDistinct("product").as("unique_products"),
        avg("revenue").as("avg_transaction_value")
      )
      .orderBy(desc("total_revenue"))
      .show()

    // Pivot table: sales by region and category
    println("Sales pivot by region and category:")
    enrichedSalesDF
      .groupBy("region")
      .pivot("category")
      .agg(sum("revenue"))
      .na.fill(0)
      .show()

    // Rolling window calculations
    import org.apache.spark.sql.expressions.Window

    val windowSpec = Window
      .partitionBy("product")
      .orderBy("date")
      .rowsBetween(Window.unboundedPreceding, Window.currentRow)

    println("Running totals per product:")
    enrichedSalesDF
      .withColumn("running_revenue", sum("revenue").over(windowSpec))
      .withColumn("running_quantity", sum("quantity").over(windowSpec))
      .select("product", "date", "revenue", "running_revenue", "quantity", "running_quantity")
      .orderBy("product", "date")
      .show()
  }
}

// 5. Machine Learning with Spark ML
object SparkML {
  import SparkSetup._

  def simpleClassification(spark: SparkSession): Unit = {
    import spark.implicits._

    println("=== Simple Machine Learning Example ===")

    // Create sample training data
    val trainingData = Seq(
      (1.0, "A", 10.0, 20.0),
      (0.0, "B", 5.0, 15.0),
      (1.0, "A", 12.0, 22.0),
      (0.0, "B", 6.0, 14.0),
      (1.0, "A", 11.0, 21.0),
      (0.0, "B", 4.0, 13.0),
      (1.0, "A", 13.0, 25.0),
      (0.0, "B", 7.0, 16.0),
      (1.0, "A", 9.0, 19.0),
      (0.0, "B", 3.0, 12.0)
    )

    val df = trainingData.toDF("label", "category", "feature1", "feature2")

    println("Training data:")
    df.show()

    // Convert string column to numeric
    val indexer = new StringIndexer()
      .setInputCol("category")
      .setOutputCol("categoryIndex")
      .fit(df)

    val indexedDF = indexer.transform(df)

    // Assemble features
    val assembler = new VectorAssembler()
      .setInputCols(Array("categoryIndex", "feature1", "feature2"))
      .setOutputCol("features")

    val assembledDF = assembler.transform(indexedDF)

    println("Data with assembled features:")
    assembledDF.select("label", "features").show(false)

    // Split data
    val Array(trainingDataSplit, testDataSplit) = assembledDF.randomSplit(Array(0.8, 0.2), seed = 12345)

    // Train logistic regression model
    val lr = new LogisticRegression()
      .setLabelCol("label")
      .setFeaturesCol("features")
      .setMaxIter(10)
      .setRegParam(0.01)

    val lrModel = lr.fit(trainingDataSplit)

    println("Model coefficients:")
    println(s"Coefficients: ${lrModel.coefficients} Intercept: ${lrModel.intercept}")

    // Make predictions
    val predictions = lrModel.transform(testDataSplit)

    println("Predictions:")
    predictions.select("label", "features", "probability", "prediction").show(false)

    // Evaluate model
    val evaluator = new BinaryClassificationEvaluator()
      .setLabelCol("label")
      .setRawPredictionCol("rawPrediction")
      .setMetricName("areaUnderROC")

    val auc = evaluator.evaluate(predictions)
    println(s"Area Under ROC: $auc")
  }
}

// 6. Performance optimization and best practices
object Optimization {
  import SparkSetup._

  def optimizationTechniques(spark: SparkSession): Unit = {
    import spark.implicits._

    println("=== Spark Optimization Techniques ===")

    // Create large dataset for optimization examples
    val largeData = (1 to 100000).map { i =>
      (i, s"item_$i", i % 100, i.toDouble / 100.0, i % 5 == 0)
    }

    val largeDF = largeData.toDF("id", "name", "category", "value", "is_premium")

    // Cache commonly used DataFrame
    println("Caching example:")
    val startCache = System.currentTimeMillis()

    largeDF.cache()  // Cache in memory

    // First action - triggers caching
    val count1 = largeDF.count()
    println(s"First count: $count1, Time: ${System.currentTimeMillis() - startCache}ms")

    // Second action - uses cached data
    val startCached = System.currentTimeMillis()
    val count2 = largeDF.filter(col("category") === 50).count()
    println(s"Second count: $count2, Time: ${System.currentTimeMillis() - startCached}ms")

    // Partitioning example
    println("\nPartitioning example:")
    val partitionedDF = largeDF.repartition(col("category"))
    println(s"Original partitions: ${largeDF.rdd.getNumPartitions}")
    println(s"Repartitioned partitions: ${partitionedDF.rdd.getNumPartitions}")

    // Coalesce for reducing partitions
    val coalescedDF = largeDF.coalesce(4)
    println(s"Coalesced partitions: ${coalescedDF.rdd.getNumPartitions}")

    // Broadcast join example
    println("\nBroadcast join example:")
    val smallLookup = Seq((0, "Basic"), (1, "Premium"), (2, "VIP"))
      .toDF("is_premium", "tier")

    // Use broadcast for small lookup table
    val joinedDF = largeDF.join(broadcast(smallLookup), "is_premium")

    println("Broadcast join result:")
    joinedDF.select("id", "name", "tier").show(10)

    // Explain plans
    println("\nQuery plan explanation:")
    joinedDF.explain(true)

    // Clean up
    largeDF.unpersist()
  }
}

// Main application to run all examples
object SparkExamples {
  def main(args: Array[String]): Unit = {
    val spark = SparkSetup.createSparkSession("Scala Spark Examples")

    try {
      // Run examples
      DataFrameBasics.basicDataFrameOperations(spark)
      println()

      RDDBasics.basicRDOperations(spark)
      println()

      AdvancedDataFrames.complexDataOperations(spark)
      println()

      DataAggregation.salesAnalytics(spark)
      println()

      SparkML.simpleClassification(spark)
      println()

      Optimization.optimizationTechniques(spark)

    } catch {
      case e: Exception =>
        println(s"Error: ${e.getMessage}")
        e.printStackTrace()
    } finally {
      spark.stop()
    }
  }
}