Scala Programming Guidesscala-concurrencyscala-async

Parallel Collections and Streams | Scala Programming Guide

By Dmitri Meshin
Picture of the author
Published on
Parallel Collections and Streams - Scala Programming Guide

Parallel Collections (.par) — Automatic Parallelism

Scala provides a deceptively simple way to parallelize collection operations: add .par to any collection and the operation automatically spreads across multiple threads. This is seductive in its simplicity—no thread management, no synchronization primitives, just append .par and suddenly you have parallelism. But this simplicity masks important details: parallel collections have significant overhead (thread creation, synchronization between chunks, result combination), so they only pay off for expensive operations on large datasets. Furthermore, the operation must be associative—the grouping of elements cannot affect the result, otherwise parallelism produces incorrect answers. And there are subtle gotchas: side effects in the operation become unpredictable when parallelized, making code that works sequentially break in parallel.

The mechanics: when you add .par, the collection is divided into chunks. Each chunk is processed on a different thread in parallel. Results are combined back into a collection. The overhead comes from dividing work, creating threads, synchronizing between threads, and combining results. For trivial operations (multiply by 2), this overhead vastly exceeds the savings from parallelization. For expensive operations (compute the prime factorization of a million numbers), the overhead is negligible compared to the savings. This is why parallel collections require careful thought: use them only when you're certain the benefit outweighs the overhead.

For these reasons, parallel collections are powerful but not a panacea—use them when you know you have a sufficiently expensive operation on a large collection with pure, associative operations. Here's how they work and when they're beneficial:

Scala provides a simple way to parallelize collection operations: add .par to create a parallel collection. This is the easiest parallelism you'll find in any language, but the easiest also brings the greatest opportunity to use it wrongly:

// Sequential processing (baseline)
val numbers = (1 to 1000).toList
val result1 = numbers.map { n =>
  // Expensive computation
  (n until n + 100).map(i => Math.sqrt(i)).sum
}
// Takes roughly 1000 * 100 units of work on one thread

// Parallel processing
val result2 = numbers.par.map { n =>
  // Same computation, but spread across multiple threads
  (n until n + 100).map(i => Math.sqrt(i)).sum
}
// Takes roughly 1000 * 100 units of work divided by number of cores

Parallel collections split the work across threads automatically, transparently. The API is identical to sequential collections—just add .par. This transparency is both a strength (easy to parallelize) and a weakness (easy to parallelize incorrectly):

import scala.collection.parallel.CollectionConverters._

val largeList = (1 to 1000000).toList

// Sequential
val timeSeq = {
  val start = System.currentTimeMillis()
  val result = largeList.filter { n =>
    // Expensive check
    (1 to 1000).map(i => Math.pow(n, i % 10)).sum > 0
  }
  System.currentTimeMillis() - start
}
println(s"Sequential: ${timeSeq}ms")

// Parallel
val timePar = {
  val start = System.currentTimeMillis()
  val result = largeList.par.filter { n =>
    (1 to 1000).map(i => Math.pow(n, i % 10)).sum > 0
  }
  System.currentTimeMillis() - start
}
println(s"Parallel: ${timePar}ms")
// Output might be: Sequential: 5000ms, Parallel: 1200ms (4x speedup on quad-core)

When Parallel Collections Help (and When They Don't)

Parallelism always has costs: creating threads, synchronizing between chunks, combining partial results. These overhead costs are only worthwhile if the per-element work is substantial. If your operation is trivial (e.g., n * 2), the overhead of parallelization vastly exceeds the savings from spreading work across threads. Similarly, a small collection (10 elements) doesn't justify thread creation, but a million elements does. Even with a large collection and expensive operation, you must ensure the operation is pure (no side effects) and associative (grouping doesn't affect results). A function with side effects becomes nondeterministic when parallelized—it might execute in any order, with unpredictable interactions. A non-associative function (like subtraction) produces wrong answers when threads group elements differently.

The rule of thumb: parallelize if element count > 100,000 AND work per element > 1ms AND operation is pure AND operation is associative. Violate any of these and parallelism likely hurts more than helps.

Parallel collections have overhead (thread management, synchronization), so they're only faster for sufficiently expensive operations. The overhead comes from dividing the collection, creating threads (JVM thread creation is expensive), waiting for all threads to complete (slower than the fastest thread), and combining results. If the operation is cheap, the overhead dominates. If the collection is small, threading overhead dominates. If the operation has side effects or is non-associative, parallelism produces wrong results. Understanding these tradeoffs is crucial for using parallel collections correctly.

// GOOD: Expensive per-element work
val heavyWork = (1 to 10000).par.map { n =>
  (1 to 10000).map(i => Math.sqrt(n * i)).sum
}
// Parallelism pays off: each element takes significant work

// BAD: Cheap per-element work
val cheapWork = (1 to 10000).par.map(_ * 2)
// Parallelism overhead exceeds benefit: work is trivial

// GOOD: Significant collection
val manyElements = (1 to 10000000).par.filter(n => isPrime(n))
// Parallelism helps with sheer volume

// BAD: Small collection
val fewElements = (1 to 10).par.filter(n => isPrime(n))
// Parallelism overhead not worth it for 10 elements

// GOOD: Pure function (no side effects)
val pureFun = largeList.par.map(n => expensiveCalculation(n))
// Safe to parallelize

// BAD: Function with side effects
var count = 0
val sideEffects = largeList.par.map { n =>
  count += 1 // RACE CONDITION! Multiple threads access count
  n * count
}
// Parallel execution breaks the logic

General guidelines:

  • Parallelize if: element count > 100,000 AND work per element > 1ms
  • Don't parallelize: trivial operations, small collections, I/O-bound work
  • Never parallelize: code with side effects or shared mutable state

Associativity Requirements for Parallel Operations

For parallel collections to give correct results, operations must be associative — the grouping doesn't affect the result. This is the core requirement for correctness: if your operation isn't associative, parallel execution will give wrong answers. Why? Because threads process different chunks independently, then combine results. If grouping matters, the order threads process chunks matters, and results become nondeterministic and wrong. Associative operations (like addition) always give the same result regardless of grouping. Non-associative operations (like subtraction) give different results depending on order.

Testing for associativity: (a + b) + c == a + (b + c) should always be true. If you break this property, don't parallelize.

// Associative: (a + b) + c == a + (b + c)
// Parallel is safe
val sum1 = (1 to 1000).sum
val sum2 = (1 to 1000).par.sum
// sum1 == sum2 (always)

// Associative: (a * b) * c == a * (b * c)
val product1 = (1 to 100).product
val product2 = (1 to 100).par.product
// product1 == product2

// NOT associative: (a - b) - c != a - (b - c)
val nums = Vector(100, 20, 5)
val seq = nums.foldLeft(0)(_ - _)  // 0 - 100 - 20 - 5 = -125
val par = nums.par.fold(0)(_ - _)  // May give -80, depends on grouping
// par might not equal seq!

// NOT associative: (a / b) / c != a / (b / c)
val div1 = Vector(1000, 10, 2).foldLeft(1000.0)(_ / _)  // 1000/10/2 = 50
val div2 = Vector(1000, 10, 2).par.fold(1000.0)(_ / _)  // Might be 200
// Results differ due to parallel grouping

// Correct parallel reduction: provide an associative operation
val result = (1 to 1000000).par
  .map(n => n * n)
  .sum  // sum is associative, so this is safe

// Using fold correctly
val assocOp = (a: Int, b: Int) => a + b
val foldResult = (1 to 100).par.fold(0)(assocOp)
// Correct because + is associative

Testing for associativity:

def testAssociativity[T](op: (T, T) => T, values: Vector[T]): Boolean = {
  // Test with random groupings
  val grouped1 = Vector(
    op(op(values(0), values(1)), op(values(2), values(3)))
  )
  val grouped2 = Vector(
    op(values(0), op(op(values(1), values(2)), values(3)))
  )
  grouped1(0) == grouped2(0)
}

// Verify associativity for your operations before parallelizing
val safe = testAssociativity((a: Int, b: Int) => a + b, Vector(1, 2, 3, 4))
val unsafe = testAssociativity((a: Int, b: Int) => a - b, Vector(1, 2, 3, 4))

Akka Streams Basics — Source, Flow, Sink

Akka Streams provides composable, backpressure-aware processing of data streams. It's fundamentally different from parallel collections. Parallel collections load all data into memory at once, then process it in chunks. Streams process data incrementally: one element or small batch at a time, respecting available memory. This difference is crucial for handling large datasets (gigabytes of logs) or infinite streams (network events). With parallel collections, you'd run out of memory. With streams, you process data as it arrives, bounded memory usage regardless of data size.

A stream has three components: a Source produces elements, Flows transform elements, and Sinks consume elements. Each stage is independently backpressure-aware: if a Sink is slow, it signals upstream that it can't accept more data. The source pauses production. Memory stays bounded. This is fundamentally different from queues where production continues until memory is exhausted. Streams handle backpressure automatically—the framework figures out how much data to buffer between stages.

import akka.actor.ActorSystem
import akka.stream.scaladsl.{Source, Flow, Sink}
import scala.concurrent.Future

object StreamBasics {
  implicit val system: ActorSystem = ActorSystem("stream-system")

  // Three components of a stream:

  // 1. SOURCE: produces elements
  val numberSource: Source[Int, _] = Source(1 to 100)

  // 2. FLOW: transforms elements
  val doubleFlow: Flow[Int, Int, _] = Flow[Int].map(_ * 2)

  // 3. SINK: consumes elements
  val printSink: Sink[Int, Future[_]] = Sink.foreach(n => println(n))

  // Compose them together
  val pipeline = numberSource
    .via(doubleFlow)
    .to(printSink)

  // Run it (materializes the stream)
  pipeline.run()

  // More realistic example: stream from a file-like source
  val lineSource: Source[String, _] = Source(
    List(
      "user1,purchase,100",
      "user2,purchase,50",
      "user1,return,-20",
      "user3,purchase,200"
    )
  )

  // Parse the lines
  val parseFlow: Flow[String, (String, String, Double), _] = Flow[String].map { line =>
    val parts = line.split(",")
    (parts(0), parts(1), parts(2).toDouble)
  }

  // Filter for purchases only
  val filterFlow: Flow[(String, String, Double), (String, Double), _] =
    Flow[(String, String, Double)]
      .filter { case (_, eventType, _) => eventType == "purchase" }
      .map { case (user, _, amount) => (user, amount) }

  // Aggregate by user
  val aggregateSink: Sink[(String, Double), Future[Map[String, Double]]] =
    Sink.fold[Map[String, Double], (String, Double)](Map()) {
      case (acc, (user, amount)) =>
        acc + (user -> (acc.getOrElse(user, 0.0) + amount))
    }

  // Build and run the complete pipeline
  val aggregateStream = lineSource
    .via(parseFlow)
    .via(filterFlow)
    .to(aggregateSink)

  val result: Future[Map[String, Double]] = aggregateStream.run()

  result.foreach { totalsByUser =>
    println(s"Purchase totals by user: $totalsByUser")
    system.terminate()
  }
}

Stream components:

  • Source[T, Mat]: Produces elements of type T, materializes to Mat (the value you get when running the stream)
  • Flow[In, Out, Mat]: Transforms In to Out, materializes to Mat
  • Sink[In, Mat]: Consumes In, materializes to Mat

The Mat (materialized value) is what you get when running the stream. For a Sink, it's often Future[ResultType] (a Future of the final result). For a Source, it might be a handle to cancel the stream. Mat is how you extract useful values from streams.

Why this architecture matters: Streams are lazy and composable. When you build a pipeline with Source.via().to(), nothing runs yet. The pipeline is just a description. When you call .run(), the framework materializes it: creates threads, sets up backpressure signaling, buffers appropriately. This lazy composition lets you build complex pipelines without committing to execution immediately.

Backpressure — Why It Matters

Backpressure means a slow consumer can tell a fast producer to slow down. Without it, memory usage explodes as fast producers queue unbounded data. With backpressure, the producer pauses production until the consumer is ready for more. This keeps memory usage bounded regardless of how much data exists upstream. Traditional approaches (unbounded queues) fail catastrophically: memory exhausts, the process dies. Akka Streams handles this automatically: if a Sink can't keep up, it signals the Flows to pause. Flows signal the Source. Everyone pauses until the Sink is ready. It's elegant and scalable.

Think of backpressure like a kitchen order flow. Orders come in (Source). Cooks process them (Flows). Customers pick up orders (Sink). If cooks process faster than customers pick up, orders pile up. In a limited kitchen, this is a disaster—no counter space. Backpressure says: if counters are full, the host stops seating new customers. Fewer new orders arrive. Cooks still work, customers still pick up, but the rate is balanced. No pile-up, no disasters.

The deep reason backpressure matters: data rates are unpredictable. A network source might suddenly spike (DDoS attack, viral content, surge traffic). Without backpressure, this spike fills memory, crashes the process. With backpressure, the pipeline automatically throttles: process slowly, drop oldest data, or fail gracefully. The system remains stable under spikes.

import akka.stream.scaladsl.Source

// Problem: fast source, slow sink (no backpressure handling)
// Without Akka Streams, this would buffer unbounded data in memory

// With Akka Streams: automatic backpressure
val fastSource = Source(1 to 1000000) // 1 million elements
val slowSink = Sink.foreach[Int] { n =>
  Thread.sleep(10) // Simulate slow processing
  println(s"Processed: $n")
}

val backpressureStream = fastSource.to(slowSink)
backpressureStream.run()

// What happens:
// 1. Source produces elements
// 2. Sink processes them slowly
// 3. Akka Streams detects sink backlog
// 4. Source pauses production
// 5. Memory stays bounded (no 1M elements buffered)

Without backpressure (traditional approach), this is catastrophic:

// DANGEROUS: produces all 1M elements into unbounded queue
val queue = scala.collection.mutable.Queue[Int]()
for (i <- 1 to 1000000) {
  queue.enqueue(i) // Fills memory immediately!
}

// Now try to consume
while (queue.nonEmpty) {
  val n = queue.dequeue()
  Thread.sleep(10) // Too late, memory already exhausted
}

Backpressure in action: The framework automatically manages buffer sizes between stages. Each stage has a demand: "how many elements can I accept?" Slow stages have low demand. Fast stages have high demand. The Source respects downstream demand: it produces only as much as downstream can accept. This coordination happens automatically in Akka Streams. You don't manage it yourself.

Stream Composition and Transformation

Streams can be built from smaller pieces, composed together to create complex pipelines. This composability is powerful: you can test individual Flows in isolation, then assemble them into larger pipelines. The final pipeline is flexible: easy to add new Flows, remove Flows, or reorder them. Composition also manages complexity: instead of one giant Source -> complex processing -> Sink, you have Source -> Flow1 -> Flow2 -> Flow3 -> Sink, where each Flow is simple and testable.

import akka.stream.scaladsl._
import scala.concurrent.Future

// Creating sources
val rangeSource = Source(1 to 100)
val listSource = Source(List("a", "b", "c", "d", "e"))
val futureSource = Source.future(Future.successful(42))

// Creating flows
val addOneFlow = Flow[Int].map(_ + 1)
val filterEvenFlow = Flow[Int].filter(_ % 2 == 0)
val uppercaseFlow = Flow[String].map(_.toUpperCase)

// Chaining flows
val complexFlow: Flow[Int, Int, _] = addOneFlow
  .via(filterEvenFlow)
  .map(_ * 10)
  .via(Flow[Int].map(_ - 5))

// Creating sinks
val sumSink: Sink[Int, Future[Int]] = Sink.fold[Int, Int](0)(_ + _)
val collectSink: Sink[Int, Future[List[Int]]] = Sink.seq[Int]
val ignoreSink: Sink[Any, Future[_]] = Sink.ignore

// Complete pipeline
val sum: Future[Int] = rangeSource
  .via(addOneFlow)
  .via(filterEvenFlow)
  .toMat(sumSink)(Keep.right)
  .run()

sum.foreach { result =>
  println(s"Sum of even (n+1) values: $result")
}

Keep strategies: When composing streams, you can choose which materialized value to keep:

  • Keep.left: Keep the left side's materialized value (Source's)
  • Keep.right: Keep the right side's materialized value (Sink's)
  • Keep.both: Keep both as a tuple

Practical Example: Processing a Log File Stream with Backpressure

A realistic log processing pipeline that handles large files. The key insight: even if the log file is 100GB, memory usage stays bounded. The file is read one line at a time (or small chunks), processed, and output. If processing is slow, the file reading pauses. No memory explosion. This is the power of Akka Streams: unlimited data sizes, bounded memory. Without streams, you'd either read the entire file into memory (crashes on huge files) or manually manage backpressure (complex and error-prone).

import akka.actor.ActorSystem
import akka.stream.scaladsl._
import scala.concurrent.Future
import java.io.File

object LogStreamProcessor {
  case class LogEntry(timestamp: String, level: String, message: String)

  implicit val system: ActorSystem = ActorSystem("log-processor")

  def processLogFile(filePath: String): Future[LogAnalysis] = {
    // Source: read lines from file (one at a time, memory-efficient)
    val fileSource: Source[String, _] = FileIO.fromPath(
      java.nio.file.Paths.get(filePath)
    )
      .via(Framing.delimiter(
        java.nio.ByteString("\n"),
        maximumFrameLength = 512
      ))
      .map(_.utf8String)

    // Flow 1: Parse log lines
    val parseFlow: Flow[String, LogEntry, _] = Flow[String]
      .map { line =>
        val parts = line.split(" \\| ")
        if (parts.length >= 3) {
          Some(LogEntry(parts(0), parts(1), parts(2)))
        } else {
          None
        }
      }
      .collect { case Some(entry) => entry }

    // Flow 2: Filter for errors
    val filterFlow: Flow[LogEntry, LogEntry, _] = Flow[LogEntry]
      .filter(_.level == "ERROR")

    // Flow 3: Enrich with metadata
    val enrichFlow: Flow[LogEntry, EnrichedLogEntry, _] = Flow[LogEntry]
      .map { entry =>
        EnrichedLogEntry(
          entry,
          severity = calculateSeverity(entry.message),
          timestamp = System.currentTimeMillis()
        )
      }

    // Sink: Aggregate statistics
    val analysisSink: Sink[EnrichedLogEntry, Future[LogAnalysis]] =
      Sink.fold[LogAnalysis, EnrichedLogEntry](LogAnalysis()) {
        case (acc, entry) =>
          acc.addEntry(entry)
      }

    // Build the stream
    fileSource
      .via(parseFlow)
      .via(filterFlow)
      .via(enrichFlow)
      .toMat(analysisSink)(Keep.right)
      .run()
  }

  case class EnrichedLogEntry(
      entry: LogEntry,
      severity: Int,
      timestamp: Long
  )

  case class LogAnalysis(
      errorCount: Int = 0,
      highSeverityCount: Int = 0,
      messages: List[String] = List()
  ) {
    def addEntry(entry: EnrichedLogEntry): LogAnalysis = {
      copy(
        errorCount = errorCount + 1,
        highSeverityCount = if (entry.severity > 8) highSeverityCount + 1 else highSeverityCount,
        messages = entry.entry.message :: messages.take(9)
      )
    }
  }

  def calculateSeverity(message: String): Int = {
    if (message.contains("CRITICAL")) 10
    else if (message.contains("FATAL")) 9
    else if (message.contains("PANIC")) 9
    else 5
  }

  def main(args: Array[String]): Unit = {
    val analysisResult = processLogFile("/var/log/app.log")

    analysisResult.foreach { analysis =>
      println(s"Log Analysis:")
      println(s"  Total errors: ${analysis.errorCount}")
      println(s"  High severity: ${analysis.highSeverityCount}")
      println(s"  Recent messages: ${analysis.messages.take(5)}")
      system.terminate()
    }

    analysisResult.failed.foreach { ex =>
      println(s"Stream failed: ${ex.getMessage}")
      system.terminate()
    }
  }
}

This example demonstrates:

  • Memory efficiency: Lines read one at a time, not buffered
  • Backpressure: If parsing is slow, file reading pauses
  • Composability: Easy to add new flows (filter, enrich, aggregate)
  • Robustness: Streaming framework handles all threading details

The stream automatically:

  • Manages thread pools
  • Handles backpressure signals
  • Buffers appropriately between stages
  • Cleans up resources on completion or error

Conclusion

Part V covered Scala's concurrency models:

  1. Futures: Non-blocking asynchronous computation with elegant composition
  2. Actors: Isolated message-passing entities with supervision and self-healing
  3. Parallel Collections & Streams: Automatic parallelism and efficient streaming

Mastering these three paradigms gives you the tools to write:

  • Responsive applications that don't block threads
  • Resilient systems that recover from failures gracefully
  • Efficient programs that make full use of modern hardware

The key principle: avoid shared mutable state. Whether through Futures' functional composition, Actors' message passing, or Streams' backpressure, structured concurrency beats traditional threading.

PART VI: ADVANCED TYPE SYSTEM CONCEPTS