Spring WebFlux: A Complete Guide to Reactive Programming in Spring Boot

Introduction

In the ever-evolving landscape of modern web applications, handling thousands of concurrent requests efficiently has become a critical requirement. Traditional blocking, synchronous programming models often struggle under heavy loads, leading to thread exhaustion and degraded performance. Enter Spring WebFlux - Spring’s fully reactive, non-blocking web framework that revolutionizes how we build scalable applications.


Table of Contents

  1. Understanding Reactive Programming
  2. Blocking vs Non-Blocking Architecture
  3. Reactive Streams API Specification
  4. Introduction to Spring WebFlux
  5. Project Reactor: Mono and Flux
  6. Setting Up Your First WebFlux Application
  7. Building Reactive REST APIs (Annotation-Based)
  8. Functional Endpoints: An Alternative Approach
  9. Reactive Database Operations with MongoDB
  10. Error Handling in Reactive Applications
  11. Backpressure Management
  12. Testing WebFlux Applications
  13. Performance Comparison: Spring MVC vs WebFlux
  14. Best Practices and When to Use WebFlux

1. Understanding Reactive Programming

Reactive programming is a declarative programming paradigm focused on data streams and the propagation of change. It’s built around reacting to events rather than actively waiting for operations to complete.

Core Principles

  • Asynchronous and Non-Blocking: Operations don’t block threads while waiting for results
  • Event-Driven: The application reacts to events as they occur
  • Backpressure Support: Consumers can signal producers to slow down when overwhelmed
  • Data Streams: Everything is treated as a stream of data that can be observed and transformed

The Observer Pattern at Scale

Reactive programming extends the Observer Pattern where:

  • Publishers emit data
  • Subscribers consume data
  • The flow is controlled through subscriptions
  • Operators transform, filter, and combine streams

2. Blocking vs Non-Blocking Architecture

Understanding the fundamental difference between blocking and non-blocking models is crucial to appreciating WebFlux’s value.

Blocking Request Processing (Traditional Spring MVC)

In traditional Spring MVC applications:

  1. Thread-per-Request Model: Each incoming request gets assigned a dedicated thread from a thread pool
  2. Blocking I/O: When the thread performs I/O operations (database queries, external API calls), it waits (blocks) until the operation completes
  3. Thread Pool Limitations: The server can handle only as many concurrent requests as there are threads in the pool
  4. Resource Intensive: Each thread consumes memory (typically 1MB for stack space)

Example Flow:

Request 1 → Thread 1 → [Waiting for Database] → Response
Request 2 → Thread 2 → [Waiting for Database] → Response
Request 3 → Thread 3 → [Waiting for External API] → Response
...
Request 201 → [BLOCKED - No threads available]

Limitations:

  • Under heavy load, threads get exhausted
  • Threads spend significant time in a waiting state (CPU idle)
  • Limited scalability due to thread pool constraints
  • High memory consumption

Non-Blocking Request Processing (Spring WebFlux)

Spring WebFlux adopts a fundamentally different approach:

  1. Event Loop Model: Uses a small, fixed number of threads (typically matching CPU cores)
  2. Non-Blocking I/O: When an I/O operation is needed, the thread registers a callback and immediately moves to process other requests
  3. Callback Execution: When the I/O operation completes, the callback is invoked to continue processing
  4. High Concurrency: Can handle thousands of concurrent requests with minimal threads

Example Flow:

Request 1 → Thread 1 → Initiate DB Query → [Switch to Request 2]
Request 2 → Thread 1 → Initiate API Call → [Switch to Request 3]
Request 3 → Thread 2 → Initiate DB Query → [Switch to Request 4]
...
[DB Result for Request 1] → Thread 1 → Complete Request 1
[API Result for Request 2] → Thread 2 → Complete Request 2

Advantages:

  • Better resource utilization
  • Improved scalability under high concurrency
  • Lower memory footprint
  • Efficient handling of I/O-bound operations

Important Note: Reactive programming doesn’t make individual requests faster. It improves throughput and scalability by allowing more concurrent requests with fewer resources.


3. Reactive Streams API Specification

The Reactive Streams specification defines a standard for asynchronous stream processing with non-blocking backpressure. It was created by engineers from Netflix, Pivotal, Lightbend, RedHat, Twitter, and Oracle, and is now part of Java 9’s java.util.concurrent.Flow API.

Four Core Interfaces

3.1 Publisher

The Publisher is a provider of a potentially unbounded number of sequenced elements, publishing them according to the demand received from its Subscribers.

Publisher.java
public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> subscriber);
}

Key Points:

  • Can serve multiple subscribers
  • Produces elements based on demand
  • Controls the flow of data

3.2 Subscriber

The Subscriber receives and processes events emitted by a Publisher.

Subscriber.java
public interface Subscriber<T> {
    public void onSubscribe(Subscription subscription);
    public void onNext(T element);
    public void onError(Throwable throwable);
    public void onComplete();
}

Method Descriptions:

  • onSubscribe(): Called when subscription is established
  • onNext(): Called for each element in the stream
  • onError(): Called when an error occurs
  • onComplete(): Called when the stream completes successfully

Important: No elements are received until subscription.request(n) is called to signal demand.

3.3 Subscription

The Subscription represents a one-to-one lifecycle between a Publisher and a Subscriber.

Subscription.java
public interface Subscription {
    public void request(long n);
    public void cancel();
}

Purpose:

  • request(n): Signals demand for n elements
  • cancel(): Cancels the subscription and allows resource cleanup

3.4 Processor

The Processor represents a processing stage that is both a Subscriber and a Publisher.

Processor.java
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}

Use Cases:

  • Transform data between stages
  • Implement intermediate processing steps
  • Act as a bridge between publishers and subscribers
  1. Project Reactor (used by Spring WebFlux)
  2. RxJava (by ReactiveX)
  3. Akka Streams (by Lightbend)

4. Introduction to Spring WebFlux

Spring WebFlux is Spring’s reactive web framework introduced in Spring 5.0. It provides a fully non-blocking, reactive alternative to Spring MVC.

Key Features

  1. Fully Non-Blocking: Built on Reactive Streams specification
  2. Backpressure Support: Handles flow control automatically
  3. Runs on Netty: Default embedded server (also supports Tomcat, Undertow, Jetty)
  4. Two Programming Models:
    • Annotation-based (similar to Spring MVC)
    • Functional endpoints (using RouterFunction)
  5. Built on Project Reactor: Uses Mono and Flux as reactive types

Architecture Overview

┌─────────────────────────────────────────────────┐
│           Client (Browser/App)                  │
└──────────────────┬──────────────────────────────┘
                   │ HTTP Request
                   ▼
┌─────────────────────────────────────────────────┐
│    Netty Server (Event Loop)                    │
│    - Non-blocking I/O                           │
│    - Small thread pool                          │
└──────────────────┬──────────────────────────────┘
                   │
                   ▼
┌─────────────────────────────────────────────────┐
│    Spring WebFlux Layer                         │
│    - RouterFunction / @RestController           │
│    - HandlerFunction / Controller Methods       │
└──────────────────┬──────────────────────────────┘
                   │
                   ▼
┌─────────────────────────────────────────────────┐
│    Service Layer (Reactive)                     │
│    - Returns Mono<T> or Flux<T>                 │
└──────────────────┬──────────────────────────────┘
                   │
                   ▼
┌─────────────────────────────────────────────────┐
│    Reactive Repository                          │
│    - ReactiveMongoRepository                    │
│    - R2DBC Repository                           │
└─────────────────────────────────────────────────┘

Spring MVC vs Spring WebFlux

Feature

Spring MVC

Spring WebFlux

Programming ModelImperative, synchronousReactive, asynchronous
ConcurrencyThread-per-requestEvent-loop, few threads
BlockingBlocking I/O by defaultNon-blocking I/O
Return TypesObjects, CollectionsMono, Flux
ServerServlet containers (Tomcat)Netty, Undertow, Servlet 3.1+
Best ForTraditional CRUD appsHigh concurrency, streaming
Learning CurveEasier for beginnersSteeper (reactive concepts)
PerformanceGood for moderate loadScales better under high load


5. Project Reactor: Mono and Flux

Project Reactor is the reactive library that powers Spring WebFlux. It provides two main publisher implementations: Mono and Flux.

5.1 Mono - Zero or One Element

Mono<T> represents an asynchronous sequence that emits 0 or 1 element, then completes (with or without an error).

Use Cases:

  • Single database record lookup
  • HTTP request that returns one response
  • Result of a computation

Creating Mono:

// Create Mono with a single value
Mono<String> mono1 = Mono.just("Hello");

// Create empty Mono
Mono<String> mono2 = Mono.empty();

// Create Mono from Callable
Mono<String> mono3 = Mono.fromCallable(() -> "Hello from Callable");

// Create Mono from Supplier
Mono<String> mono4 = Mono.fromSupplier(() -> "Hello from Supplier");

// Create error Mono
Mono<String> mono5 = Mono.error(new RuntimeException("Error occurred"));

// Defer creation until subscription
Mono<String> mono6 = Mono.defer(() -> Mono.just("Deferred value"));

Common Mono Operations:

// Transform data
Mono<String> mono = Mono.just("hello")
    .map(String::toUpperCase)  // "HELLO"
    .map(s -> s + " WORLD");   // "HELLO WORLD"

// Flat map for nested async operations
Mono<User> userMono = Mono.just(userId)
    .flatMap(id -> userRepository.findById(id))
    .flatMap(user -> profileService.enrichProfile(user));

// Filter
Mono<Integer> evenNumber = Mono.just(10)
    .filter(n -> n % 2 == 0);  // Emits 10

// Default value if empty
Mono<String> result = Mono.empty()
    .defaultIfEmpty("Default Value");

// Subscribe to receive values
mono.subscribe(
    value -> System.out.println("Received: " + value),
    error -> System.err.println("Error: " + error),
    () -> System.out.println("Completed")
);

5.2 Flux - Zero to N Elements

Flux<T> represents an asynchronous sequence that emits 0 to N elements, then completes (with or without an error).

Use Cases:

  • List of database records
  • Stream of events
  • Multiple results from a query

Creating Flux:

// Create Flux from multiple values
Flux<String> flux1 = Flux.just("Apple", "Banana", "Cherry");

// Create from array
Flux<String> flux2 = Flux.fromArray(new String[]{"A", "B", "C"});

// Create from collection
Flux<String> flux3 = Flux.fromIterable(Arrays.asList("X", "Y", "Z"));

// Create from Stream
Flux<String> flux4 = Flux.fromStream(Stream.of("One", "Two", "Three"));

// Create range of numbers
Flux<Integer> flux5 = Flux.range(1, 10);  // 1 to 10

// Create interval (emits every duration)
Flux<Long> flux6 = Flux.interval(Duration.ofSeconds(1));

// Create empty Flux
Flux<String> flux7 = Flux.empty();

Common Flux Operations:

// Map transformation
Flux<String> upperCase = Flux.just("apple", "banana", "cherry")
    .map(String::toUpperCase);

// Filter
Flux<Integer> evenNumbers = Flux.range(1, 10)
    .filter(n -> n % 2 == 0);

// FlatMap (for nested async operations)
Flux<Order> orders = Flux.just(user1, user2, user3)
    .flatMap(user -> orderRepository.findByUserId(user.getId()));

// Take first N elements
Flux<Integer> firstThree = Flux.range(1, 100)
    .take(3);  // 1, 2, 3

// Skip first N elements
Flux<Integer> skipTwo = Flux.range(1, 10)
    .skip(2);  // 3, 4, 5, ..., 10

// Collect to List
Mono<List<String>> list = Flux.just("A", "B", "C")
    .collectList();

// Merge multiple Flux
Flux<String> merged = Flux.merge(flux1, flux2, flux3);

// Zip (combine elements pairwise)
Flux<Tuple2<String, Integer>> zipped = Flux.zip(
    Flux.just("A", "B", "C"),
    Flux.just(1, 2, 3)
);

// Buffer elements
Flux<List<Integer>> buffered = Flux.range(1, 10)
    .buffer(3);  // [1,2,3], [4,5,6], [7,8,9], [10]

5.3 Hot vs Cold Publishers

Cold Publishers:

  • Don’t emit data until a subscriber subscribes
  • Each subscriber gets its own independent stream
  • Examples: Mono, Flux created from data sources
Flux<Integer> coldFlux = Flux.range(1, 5)
    .map(i -> {
        System.out.println("Generating: " + i);
        return i;
    });

// Nothing happens here

coldFlux.subscribe(i -> System.out.println("Subscriber 1: " + i));
// Outputs: Generating 1, Subscriber 1: 1, Generating 2, ...

coldFlux.subscribe(i -> System.out.println("Subscriber 2: " + i));
// Outputs: Generating 1, Subscriber 2: 1, Generating 2, ...
// Note: Data is generated again for second subscriber

Hot Publishers:

  • Emit data regardless of subscribers
  • Multiple subscribers share the same stream
  • Late subscribers may miss earlier events
  • Examples: UI events, broadcast streams
Flux<Long> hotFlux = Flux.interval(Duration.ofSeconds(1))
    .share();  // Convert to hot publisher

hotFlux.subscribe(i -> System.out.println("Subscriber 1: " + i));

Thread.sleep(2500);  // Wait 2.5 seconds

hotFlux.subscribe(i -> System.out.println("Subscriber 2: " + i));
// Subscriber 2 misses first 2 elements

6. Setting Up Your First WebFlux Application

Let’s create a complete Spring Boot WebFlux application from scratch.

6.1 Maven Dependencies

Create a new Spring Boot project and add these dependencies to your pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
         https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.2.0</version>
        <relativePath/>
    </parent>
    
    <groupId>com.example</groupId>
    <artifactId>webflux-demo</artifactId>
    <version>1.0.0</version>
    <name>WebFlux Demo</name>
    
    <properties>
        <java.version>17</java.version>
    </properties>
    
    <dependencies>
        <!-- Spring WebFlux -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>
        
        <!-- Reactive MongoDB -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
        </dependency>
        
        <!-- Validation -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-validation</artifactId>
        </dependency>
        
        <!-- Lombok (optional, for reducing boilerplate) -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        
        <!-- Testing -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
    
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

Dependency Explanation:

  • spring-boot-starter-webflux: Core WebFlux functionality with Netty server
  • spring-boot-starter-data-mongodb-reactive: Reactive MongoDB driver
  • spring-boot-starter-validation: Bean validation
  • reactor-test: Testing utilities for reactive code

6.2 Configuration Classes

WebFlux Configuration:

WebFluxConfig.java
package com.example.webflux.config;

import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.config.EnableWebFlux;
import org.springframework.web.reactive.config.WebFluxConfigurer;

@Configuration
@EnableWebFlux
public class WebFluxConfig implements WebFluxConfigurer {
    
    // You can customize WebFlux behavior here
    // For example: configure CORS, add formatters, etc.
    
}

MongoDB Configuration:

MongoConfig.java
package com.example.webflux.config;

import com.mongodb.reactivestreams.client.MongoClient;
import com.mongodb.reactivestreams.client.MongoClients;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.mongodb.config.AbstractReactiveMongoConfiguration;
import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
import org.springframework.data.mongodb.repository.config.EnableReactiveMongoRepositories;

@Configuration
@EnableReactiveMongoRepositories(basePackages = "com.example.webflux.repository")
public class MongoConfig extends AbstractReactiveMongoConfiguration {
    
    @Value("${spring.data.mongodb.database:testdb}")
    private String databaseName;
    
    @Value("${spring.data.mongodb.uri:mongodb://localhost:27017}")
    private String mongoUri;
    
    @Override
    protected String getDatabaseName() {
        return databaseName;
    }
    
    @Override
    @Bean
    public MongoClient reactiveMongoClient() {
        return MongoClients.create(mongoUri);
    }
    
    @Bean
    public ReactiveMongoTemplate reactiveMongoTemplate() {
        return new ReactiveMongoTemplate(reactiveMongoClient(), getDatabaseName());
    }
}

Application Properties:

# application.properties
server.port=8080

# MongoDB Configuration
spring.data.mongodb.uri=mongodb://localhost:27017
spring.data.mongodb.database=employeedb

# Logging
logging.level.org.springframework.data.mongodb=DEBUG
logging.level.reactor.netty=INFO

6.3 Application Main Class

WebFluxDemoApplication.java
package com.example.webflux;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class WebFluxDemoApplication {
    
    public static void main(String[] args) {
        SpringApplication.run(WebFluxDemoApplication.class, args);
    }
}

7. Building Reactive REST APIs (Annotation-Based)

Let’s build a complete Employee Management System using the annotation-based approach.

7.1 Domain Model

Employee.java
package com.example.webflux.model;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;

import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.Positive;

@Document(collection = "employees")
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Employee {
    
    @Id
    private String id;
    
    @NotBlank(message = "Name is required")
    private String name;
    
    @NotBlank(message = "Department is required")
    private String department;
    
    @Positive(message = "Salary must be positive")
    private Double salary;
    
    private String email;
}

Explanation:

  • @Document: Marks this as a MongoDB document
  • @Id: MongoDB’s unique identifier
  • @Data: Lombok annotation generating getters, setters, toString, etc.
  • Validation annotations ensure data integrity

7.2 Repository Layer

EmployeeRepository.java 
package com.example.webflux.repository;

import com.example.webflux.model.Employee;
import org.springframework.data.mongodb.repository.Query;
import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
import org.springframework.stereotype.Repository;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Repository
public interface EmployeeRepository extends ReactiveMongoRepository<Employee, String> {
    
    // Custom query methods
    
    // Find by name
    Flux<Employee> findByName(String name);
    
    // Find by department
    Flux<Employee> findByDepartment(String department);
    
    // Find by salary range
    Flux<Employee> findBySalaryBetween(Double minSalary, Double maxSalary);
    
    // Custom query using @Query
    @Query("{ 'salary': { $gte: ?0 } }")
    Flux<Employee> findEmployeesWithSalaryGreaterThan(Double salary);
    
    // Find by email
    Mono<Employee> findByEmail(String email);
    
    // Check if employee exists by email
    Mono<Boolean> existsByEmail(String email);
}

Key Points:

  • ReactiveMongoRepository: Provides reactive CRUD operations
  • Methods return Mono or Flux instead of synchronous types
  • Spring Data automatically implements query methods based on method names
  • @Query allows custom MongoDB queries

7.3 Service Layer

EmployeeService.java
package com.example.webflux.service;

import com.example.webflux.model.Employee;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public interface EmployeeService {
    
    Mono<Employee> createEmployee(Employee employee);
    
    Mono<Employee> getEmployeeById(String id);
    
    Flux<Employee> getAllEmployees();
    
    Flux<Employee> getEmployeesByDepartment(String department);
    
    Mono<Employee> updateEmployee(String id, Employee employee);
    
    Mono<Void> deleteEmployee(String id);
    
    Flux<Employee> findHighEarners(Double minSalary);
}

Service Implementation:

EmployeeServiceImpl.java 
package com.example.webflux.service.impl;

import com.example.webflux.exception.EmployeeNotFoundException;
import com.example.webflux.model.Employee;
import com.example.webflux.repository.EmployeeRepository;
import com.example.webflux.service.EmployeeService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Service
@RequiredArgsConstructor
@Slf4j
public class EmployeeServiceImpl implements EmployeeService {
    
    private final EmployeeRepository employeeRepository;
    
    @Override
    public Mono<Employee> createEmployee(Employee employee) {
        log.info("Creating employee: {}", employee.getName());
        
        // Check if email already exists
        return employeeRepository.existsByEmail(employee.getEmail())
            .flatMap(exists -> {
                if (exists) {
                    return Mono.error(new IllegalArgumentException(
                        "Employee with email " + employee.getEmail() + " already exists"
                    ));
                }
                return employeeRepository.save(employee);
            })
            .doOnSuccess(saved -> log.info("Employee created with ID: {}", saved.getId()))
            .doOnError(error -> log.error("Error creating employee", error));
    }
    
    @Override
    public Mono<Employee> getEmployeeById(String id) {
        log.info("Fetching employee with ID: {}", id);
        
        return employeeRepository.findById(id)
            .switchIfEmpty(Mono.error(
                new EmployeeNotFoundException("Employee not found with ID: " + id)
            ))
            .doOnSuccess(emp -> log.info("Found employee: {}", emp.getName()));
    }
    
    @Override
    public Flux<Employee> getAllEmployees() {
        log.info("Fetching all employees");
        
        return employeeRepository.findAll()
            .doOnComplete(() -> log.info("Fetched all employees"))
            .doOnError(error -> log.error("Error fetching employees", error));
    }
    
    @Override
    public Flux<Employee> getEmployeesByDepartment(String department) {
        log.info("Fetching employees in department: {}", department);
        
        return employeeRepository.findByDepartment(department)
            .switchIfEmpty(Flux.empty())
            .doOnComplete(() -> log.info("Completed fetching employees for department: {}", department));
    }
    
    @Override
    public Mono<Employee> updateEmployee(String id, Employee employee) {
        log.info("Updating employee with ID: {}", id);
        
        return employeeRepository.findById(id)
            .switchIfEmpty(Mono.error(
                new EmployeeNotFoundException("Employee not found with ID: " + id)
            ))
            .flatMap(existingEmployee -> {
                // Update fields
                existingEmployee.setName(employee.getName());
                existingEmployee.setDepartment(employee.getDepartment());
                existingEmployee.setSalary(employee.getSalary());
                existingEmployee.setEmail(employee.getEmail());
                
                return employeeRepository.save(existingEmployee);
            })
            .doOnSuccess(updated -> log.info("Employee updated: {}", updated.getId()));
    }
    
    @Override
    public Mono<Void> deleteEmployee(String id) {
        log.info("Deleting employee with ID: {}", id);
        
        return employeeRepository.findById(id)
            .switchIfEmpty(Mono.error(
                new EmployeeNotFoundException("Employee not found with ID: " + id)
            ))
            .flatMap(employeeRepository::delete)
            .doOnSuccess(v -> log.info("Employee deleted: {}", id));
    }
    
    @Override
    public Flux<Employee> findHighEarners(Double minSalary) {
        log.info("Finding employees with salary > {}", minSalary);
        
        return employeeRepository.findEmployeesWithSalaryGreaterThan(minSalary)
            .doOnComplete(() -> log.info("Completed finding high earners"));
    }
}

Important Reactive Operators Used:

  • flatMap(): Used for chaining async operations (returns Mono/Flux)
  • map(): Used for synchronous transformations
  • switchIfEmpty(): Provides fallback if source is empty
  • doOnSuccess()doOnError()doOnComplete(): Side-effect operations for logging
  • filter(): Filters elements based on predicates

7.4 REST Controller

EmployeeController.java 
package com.example.webflux.controller;

import com.example.webflux.model.Employee;
import com.example.webflux.service.EmployeeService;
import jakarta.validation.Valid;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;

@RestController
@RequestMapping("/api/employees")
@RequiredArgsConstructor
@Slf4j
public class EmployeeController {
    
    private final EmployeeService employeeService;
    
    /**
     * Create a new employee
     * POST /api/employees
     */
    @PostMapping
    @ResponseStatus(HttpStatus.CREATED)
    public Mono<Employee> createEmployee(@Valid @RequestBody Employee employee) {
        log.info("REST request to create employee: {}", employee.getName());
        return employeeService.createEmployee(employee);
    }
    
    /**
     * Get employee by ID
     * GET /api/employees/{id}
     */
    @GetMapping("/{id}")
    public Mono<ResponseEntity<Employee>> getEmployeeById(@PathVariable String id) {
        log.info("REST request to get employee by ID: {}", id);
        
        return employeeService.getEmployeeById(id)
            .map(ResponseEntity::ok)
            .defaultIfEmpty(ResponseEntity.notFound().build());
    }
    
    /**
     * Get all employees
     * GET /api/employees
     */
    @GetMapping
    public Flux<Employee> getAllEmployees() {
        log.info("REST request to get all employees");
        return employeeService.getAllEmployees();
    }
    
    /**
     * Stream employees with Server-Sent Events
     * GET /api/employees/stream
     * This keeps the connection open and streams data as it becomes available
     */
    @GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<Employee> streamAllEmployees() {
        log.info("REST request to stream all employees");
        
        // Add delay to simulate streaming
        return employeeService.getAllEmployees()
            .delayElements(Duration.ofSeconds(1));
    }
    
    /**
     * Get employees by department
     * GET /api/employees/department/{department}
     */
    @GetMapping("/department/{department}")
    public Flux<Employee> getEmployeesByDepartment(@PathVariable String department) {
        log.info("REST request to get employees in department: {}", department);
        return employeeService.getEmployeesByDepartment(department);
    }
    
    /**
     * Find high earners
     * GET /api/employees/high-earners?minSalary=50000
     */
    @GetMapping("/high-earners")
    public Flux<Employee> findHighEarners(@RequestParam Double minSalary) {
        log.info("REST request to find high earners with salary > {}", minSalary);
        return employeeService.findHighEarners(minSalary);
    }
    
    /**
     * Update employee
     * PUT /api/employees/{id}
     */
    @PutMapping("/{id}")
    public Mono<ResponseEntity<Employee>> updateEmployee(
            @PathVariable String id,
            @Valid @RequestBody Employee employee) {
        log.info("REST request to update employee with ID: {}", id);
        
        return employeeService.updateEmployee(id, employee)
            .map(ResponseEntity::ok)
            .defaultIfEmpty(ResponseEntity.notFound().build());
    }
    
    /**
     * Delete employee
     * DELETE /api/employees/{id}
     */
    @DeleteMapping("/{id}")
    @ResponseStatus(HttpStatus.NO_CONTENT)
    public Mono<Void> deleteEmployee(@PathVariable String id) {
        log.info("REST request to delete employee with ID: {}", id);
        return employeeService.deleteEmployee(id);
    }
}

Key Features:

  • @RestController: Marks this as a REST controller
  • @RequestMapping: Base path for all endpoints
  • Methods return Mono or Flux instead of regular objects
  • MediaType.TEXT_EVENT_STREAM_VALUE: Enables streaming responses
  • @Valid: Triggers bean validation

API Endpoints:

  • POST /api/employees - Create employee
  • GET /api/employees - Get all employees
  • GET /api/employees/{id} - Get employee by ID
  • GET /api/employees/stream - Stream employees (SSE)
  • GET /api/employees/department/{dept} - Get by department
  • GET /api/employees/high-earners?minSalary=X - Find high earners
  • PUT /api/employees/{id} - Update employee
  • DELETE /api/employees/{id} - Delete employee

8. Functional Endpoints: An Alternative Approach

WebFlux provides a functional programming style for defining routes, which is an alternative to the annotation-based approach. This style uses RouterFunction and HandlerFunction.

8.1 Understanding Functional Endpoints

Key Concepts:

  • HandlerFunction: A function that handles an HTTP request and returns a Mono<ServerResponse>
  • RouterFunction: Routes requests to handler functions (equivalent to @RequestMapping)
  • ServerRequest: Immutable representation of HTTP request
  • ServerResponse: Immutable representation of HTTP response

Advantages:

  • More explicit routing
  • Better testability
  • Functional composition
  • No hidden magic from annotations

8.2 Handler Class

package com.example.webflux.handler;

import com.example.webflux.model.Employee;
import com.example.webflux.service.EmployeeService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;

import java.net.URI;

@Component
@RequiredArgsConstructor
@Slf4j
public class EmployeeHandler {
    
    private final EmployeeService employeeService;
    
    /**
     * Get all employees
     */
    public Mono<ServerResponse> getAllEmployees(ServerRequest request) {
        log.info("Functional: Get all employees");
        
        return ServerResponse
            .ok()
            .contentType(MediaType.APPLICATION_JSON)
            .body(employeeService.getAllEmployees(), Employee.class);
    }
    
    /**
     * Get employee by ID
     */
    public Mono<ServerResponse> getEmployeeById(ServerRequest request) {
        String id = request.pathVariable("id");
        log.info("Functional: Get employee by ID: {}", id);
        
        return employeeService.getEmployeeById(id)
            .flatMap(employee -> ServerResponse
                .ok()
                .contentType(MediaType.APPLICATION_JSON)
                .bodyValue(employee))
            .switchIfEmpty(ServerResponse.notFound().build());
    }
    
    /**
     * Create employee
     */
    public Mono<ServerResponse> createEmployee(ServerRequest request) {
        log.info("Functional: Create employee");
        
        Mono<Employee> employeeMono = request.bodyToMono(Employee.class);
        
        return employeeMono
            .flatMap(employeeService::createEmployee)
            .flatMap(createdEmployee -> ServerResponse
                .created(URI.create("/api/functional/employees/" + createdEmployee.getId()))
                .contentType(MediaType.APPLICATION_JSON)
                .bodyValue(createdEmployee))
            .onErrorResume(IllegalArgumentException.class, error ->
                ServerResponse.badRequest()
                    .bodyValue("Error: " + error.getMessage()));
    }
    
    /**
     * Update employee
     */
    public Mono<ServerResponse> updateEmployee(ServerRequest request) {
        String id = request.pathVariable("id");
        log.info("Functional: Update employee with ID: {}", id);
        
        Mono<Employee> employeeMono = request.bodyToMono(Employee.class);
        
        return employeeMono
            .flatMap(employee -> employeeService.updateEmployee(id, employee))
            .flatMap(updated -> ServerResponse
                .ok()
                .contentType(MediaType.APPLICATION_JSON)
                .bodyValue(updated))
            .switchIfEmpty(ServerResponse.notFound().build());
    }
    
    /**
     * Delete employee
     */
    public Mono<ServerResponse> deleteEmployee(ServerRequest request) {
        String id = request.pathVariable("id");
        log.info("Functional: Delete employee with ID: {}", id);
        
        return employeeService.deleteEmployee(id)
            .flatMap(v -> ServerResponse.noContent().build())
            .onErrorResume(e -> ServerResponse.notFound().build());
    }
    
    /**
     * Get employees by department
     */
    public Mono<ServerResponse> getByDepartment(ServerRequest request) {
        String department = request.pathVariable("department");
        log.info("Functional: Get employees in department: {}", department);
        
        return ServerResponse
            .ok()
            .contentType(MediaType.APPLICATION_JSON)
            .body(employeeService.getEmployeesByDepartment(department), Employee.class);
    }
    
    /**
     * Stream employees with SSE
     */
    public Mono<ServerResponse> streamEmployees(ServerRequest request) {
        log.info("Functional: Stream employees");
        
        return ServerResponse
            .ok()
            .contentType(MediaType.TEXT_EVENT_STREAM)
            .body(employeeService.getAllEmployees(), Employee.class);
    }
}

8.3 Router Configuration

package com.example.webflux.router;

import com.example.webflux.handler.EmployeeHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerResponse;

import static org.springframework.web.reactive.function.server.RequestPredicates.*;

@Configuration
public class EmployeeRouter {
    
    @Bean
    public RouterFunction<ServerResponse> employeeRoutes(EmployeeHandler handler) {
        
        return RouterFunctions
            .route(GET("/api/functional/employees")
                .and(accept(MediaType.APPLICATION_JSON)), handler::getAllEmployees)
            
            .andRoute(GET("/api/functional/employees/{id}")
                .and(accept(MediaType.APPLICATION_JSON)), handler::getEmployeeById)
            
            .andRoute(POST("/api/functional/employees")
                .and(contentType(MediaType.APPLICATION_JSON)), handler::createEmployee)
            
            .andRoute(PUT("/api/functional/employees/{id}")
                .and(contentType(MediaType.APPLICATION_JSON)), handler::updateEmployee)
            
            .andRoute(DELETE("/api/functional/employees/{id}"), handler::deleteEmployee)
            
            .andRoute(GET("/api/functional/employees/department/{department}")
                .and(accept(MediaType.APPLICATION_JSON)), handler::getByDepartment)
            
            .andRoute(GET("/api/functional/employees/stream")
                .and(accept(MediaType.TEXT_EVENT_STREAM)), handler::streamEmployees);
    }
}

Alternative Router Style (Nested):

@Bean
public RouterFunction<ServerResponse> employeeRoutesNested(EmployeeHandler handler) {
    
    return RouterFunctions.route()
        .path("/api/functional/employees", builder -> builder
            .GET("", handler::getAllEmployees)
            .GET("/{id}", handler::getEmployeeById)
            .GET("/stream", handler::streamEmployees)
            .GET("/department/{department}", handler::getByDepartment)
            .POST("", handler::createEmployee)
            .PUT("/{id}", handler::updateEmployee)
            .DELETE("/{id}", handler::deleteEmployee)
        )
        .build();
}

Comparison:

FeatureAnnotation-BasedFunctional
StyleDeclarativeFunctional
Routing@RequestMappingRouterFunction
HandlerController methodsHandlerFunction
TestingRequires Spring contextCan test pure functions
FlexibilityLess flexibleHighly composable
Learning CurveEasier (familiar)Requires functional knowledge

9. Reactive Database Operations with MongoDB

To create a truly non-blocking application, the database layer must also be reactive.

9.1 Why Reactive Databases?

Traditional JDBC connections are blocking. If you use blocking database operations in a WebFlux application, you negate the benefits of reactive programming.

Reactive Database Options:

  • MongoDB with Reactive Streams driver
  • R2DBC (Reactive Relational Database Connectivity) for SQL databases
  • Cassandra with Reactive driver
  • Redis with Reactive driver

9.2 Advanced Repository Operations

package com.example.webflux.repository;

import com.example.webflux.model.Employee;
import org.springframework.data.domain.Pageable;
import org.springframework.data.mongodb.repository.Aggregation;
import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public interface EmployeeRepository extends ReactiveMongoRepository<Employee, String> {
    
    // Pagination
    Flux<Employee> findByDepartment(String department, Pageable pageable);
    
    // Count operations
    Mono<Long> countByDepartment(String department);
    
    // Delete operations
    Mono<Long> deleteByDepartment(String department);
    
    // Aggregation example
    @Aggregation(pipeline = {
        "{ $group: { _id: '$department', avgSalary: { $avg: '$salary' } } }",
        "{ $sort: { avgSalary: -1 } }"
    })
    Flux<DepartmentSalaryStats> getAverageSalaryByDepartment();
}

9.3 Using ReactiveMongoTemplate

For complex queries, use ReactiveMongoTemplate:

package com.example.webflux.service;

import com.example.webflux.model.Employee;
import lombok.RequiredArgsConstructor;
import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Service
@RequiredArgsConstructor
public class EmployeeQueryService {
    
    private final ReactiveMongoTemplate mongoTemplate;
    
    /**
     * Complex query with multiple criteria
     */
    public Flux<Employee> findEmployeesByCriteria(
            String department, 
            Double minSalary, 
            Double maxSalary) {
        
        Query query = new Query();
        query.addCriteria(Criteria.where("department").is(department)
            .and("salary").gte(minSalary).lte(maxSalary));
        
        return mongoTemplate.find(query, Employee.class);
    }
    
    /**
     * Update specific field
     */
    public Mono<Long> updateSalaryForDepartment(String department, Double increment) {
        Query query = Query.query(Criteria.where("department").is(department));
        Update update = new Update().inc("salary", increment);
        
        return mongoTemplate.updateMulti(query, update, Employee.class)
            .map(result -> result.getModifiedCount());
    }
    
    /**
     * Count with criteria
     */
    public Mono<Long> countHighEarners(Double threshold) {
        Query query = Query.query(Criteria.where("salary").gte(threshold));
        return mongoTemplate.count(query, Employee.class);
    }
    
    /**
     * Find distinct departments
     */
    public Flux<String> findDistinctDepartments() {
        return mongoTemplate.findDistinct(
            new Query(), 
            "department", 
            Employee.class, 
            String.class
        );
    }
}

10. Error Handling in Reactive Applications

Error handling in reactive streams is different from traditional try-catch blocks.

10.1 Reactive Error Handling Operators

1. onErrorReturn() - Static Fallback Value

public Mono<Employee> getEmployeeWithFallback(String id) {
    return employeeRepository.findById(id)
        .onErrorReturn(new Employee("unknown", "Unknown", 0.0));
}

2. onErrorResume() - Dynamic Fallback

public Mono<Employee> getEmployeeWithDynamicFallback(String id) {
    return employeeRepository.findById(id)
        .onErrorResume(error -> {
            log.error("Error fetching employee", error);
            
            if (error instanceof EmployeeNotFoundException) {
                // Return default employee
                return Mono.just(new Employee("default", "Default", 0.0));
            } else {
                // Re-throw other errors
                return Mono.error(error);
            }
        });
}

3. onErrorContinue() - Skip Errors and Continue

public Flux<Employee> getAllEmployeesSkipErrors() {
    return employeeRepository.findAll()
        .map(this::processEmployee)  // Might throw exception
        .onErrorContinue((error, value) -> {
            log.warn("Skipping employee due to error: {}", error.getMessage());
        });
}

4. onErrorComplete() - Convert Error to Completion

public Mono<Void> deleteEmployeeQuietly(String id) {
    return employeeRepository.deleteById(id)
        .onErrorComplete();  // Ignore errors, just complete
}

5. doOnError() - Side Effect on Error

public Mono<Employee> getEmployeeWithLogging(String id) {
    return employeeRepository.findById(id)
        .doOnError(error -> log.error("Failed to fetch employee: {}", id, error));
}

10.2 Global Exception Handler

package com.example.webflux.exception;

import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.web.error.ErrorAttributeOptions;
import org.springframework.boot.web.reactive.error.DefaultErrorAttributes;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerRequest;

import java.util.Map;

@Component
@Slf4j
public class GlobalErrorAttributes extends DefaultErrorAttributes {
    
    @Override
    public Map<String, Object> getErrorAttributes(
            ServerRequest request, 
            ErrorAttributeOptions options) {
        
        Map<String, Object> errorAttributes = super.getErrorAttributes(request, options);
        Throwable error = getError(request);
        
        if (error instanceof EmployeeNotFoundException) {
            errorAttributes.put("status", HttpStatus.NOT_FOUND.value());
            errorAttributes.put("error", "Not Found");
            errorAttributes.put("message", error.getMessage());
        } else if (error instanceof IllegalArgumentException) {
            errorAttributes.put("status", HttpStatus.BAD_REQUEST.value());
            errorAttributes.put("error", "Bad Request");
            errorAttributes.put("message", error.getMessage());
        } else {
            errorAttributes.put("status", HttpStatus.INTERNAL_SERVER_ERROR.value());
            errorAttributes.put("error", "Internal Server Error");
            errorAttributes.put("message", "An unexpected error occurred");
        }
        
        log.error("Error occurred: ", error);
        return errorAttributes;
    }
}

Global Error Handler:

package com.example.webflux.exception;

import org.springframework.boot.autoconfigure.web.WebProperties;
import org.springframework.boot.autoconfigure.web.reactive.error.AbstractErrorWebExceptionHandler;
import org.springframework.boot.web.reactive.error.ErrorAttributes;
import org.springframework.context.ApplicationContext;
import org.springframework.core.annotation.Order;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerCodecConfigurer;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.server.*;
import reactor.core.publisher.Mono;

import java.util.Map;

@Component
@Order(-2)
public class GlobalErrorWebExceptionHandler extends AbstractErrorWebExceptionHandler {
    
    public GlobalErrorWebExceptionHandler(
            GlobalErrorAttributes errorAttributes,
            WebProperties webProperties,
            ApplicationContext applicationContext,
            ServerCodecConfigurer serverCodecConfigurer) {
        
        super(errorAttributes, webProperties.getResources(), applicationContext);
        super.setMessageWriters(serverCodecConfigurer.getWriters());
        super.setMessageReaders(serverCodecConfigurer.getReaders());
    }
    
    @Override
    protected RouterFunction<ServerResponse> getRoutingFunction(
            ErrorAttributes errorAttributes) {
        
        return RouterFunctions.route(
            RequestPredicates.all(), 
            this::renderErrorResponse
        );
    }
    
    private Mono<ServerResponse> renderErrorResponse(ServerRequest request) {
        
        Map<String, Object> errorPropertiesMap = getErrorAttributes(
            request, 
            ErrorAttributeOptions.defaults()
        );
        
        int status = (int) errorPropertiesMap.getOrDefault("status", 500);
        
        return ServerResponse
            .status(HttpStatus.valueOf(status))
            .contentType(MediaType.APPLICATION_JSON)
            .body(BodyInserters.fromValue(errorPropertiesMap));
    }
}

10.3 Custom Exceptions

package com.example.webflux.exception;

public class EmployeeNotFoundException extends RuntimeException {
    public EmployeeNotFoundException(String message) {
        super(message);
    }
}

11. Backpressure Management

Backpressure is a mechanism that allows consumers to signal producers to slow down when they’re overwhelmed.

11.1 Understanding Backpressure

In reactive streams:

  • Fast Producer: Generates data quickly
  • Slow Consumer: Processes data slowly
  • Problem: Consumer gets overwhelmed

Solution: Backpressure

  • Consumer requests specific number of items
  • Producer only sends what’s requested
  • Prevents overflow and resource exhaustion

11.2 Backpressure Strategies

1. limitRate() - Control Demand

@GetMapping("/employees/limited")
public Flux<Employee> getEmployeesWithRateLimit() {
    return employeeService.getAllEmployees()
        .limitRate(10);  // Request 10 items at a time
}

How it works:

  • Even if subscriber requests Long.MAX_VALUElimitRate chunks the demand
  • Requests 10 elements, when 7 are consumed (75%), requests 7 more
  • Maintains a sliding window

2. onBackpressureBuffer() - Buffer Overflow

public Flux<Employee> getEmployeesWithBuffer() {
    return employeeRepository.findAll()
        .onBackpressureBuffer(100)  // Buffer up to 100 elements
        .doOnNext(emp -> processSlowly(emp));
}

Warning: If buffer fills up, it throws an OverflowException

With Custom Strategy:

public Flux<Employee> getEmployeesWithSmartBuffer() {
    return employeeRepository.findAll()
        .onBackpressureBuffer(
            100,  // Buffer size
            emp -> log.warn("Buffer overflow, dropping: {}", emp),  // On overflow
            BufferOverflowStrategy.DROP_LATEST  // Drop newest items
        );
}

Buffer Strategies:

  • DROP_LATEST: Drop newest items when buffer is full
  • DROP_OLDEST: Drop oldest items when buffer is full
  • ERROR: Throw error when buffer is full

3. onBackpressureDrop() - Drop Excess Items

public Flux<Employee> getEmployeesWithDrop() {
    return employeeRepository.findAll()
        .onBackpressureDrop(emp -> 
            log.warn("Dropping employee due to backpressure: {}", emp.getId())
        );
}

Use Case: Suitable for telemetry, logs, or metrics where occasional data loss is acceptable

4. onBackpressureLatest() - Keep Only Latest

public Flux<Employee> getEmployeesLatestOnly() {
    return employeeRepository.findAll()
        .onBackpressureLatest();  // Keep only the most recent item
}

5. delayElements() - Slow Down Producer

@GetMapping(value = "/employees/throttled", 
            produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Employee> getEmployeesThrottled() {
    return employeeService.getAllEmployees()
        .delayElements(Duration.ofMillis(100));  // Emit every 100ms
}

11.3 Custom Backpressure Handling

import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;

public class CustomBackpressureSubscriber extends BaseSubscriber<Employee> {
    
    private int consumed = 0;
    private final int batchSize = 5;
    
    @Override
    protected void hookOnSubscribe(Subscription subscription) {
        request(batchSize);  // Initial request
    }
    
    @Override
    protected void hookOnNext(Employee employee) {
        // Process employee
        System.out.println("Processing: " + employee.getName());
        
        consumed++;
        
        if (consumed == batchSize) {
            consumed = 0;
            request(batchSize);  // Request next batch
        }
    }
    
    @Override
    protected void hookOnError(Throwable throwable) {
        System.err.println("Error: " + throwable.getMessage());
    }
    
    @Override
    protected void hookOnComplete() {
        System.out.println("Processing complete");
    }
}

Usage:

public void processEmployeesWithCustomBackpressure() {
    employeeRepository.findAll()
        .subscribe(new CustomBackpressureSubscriber());
}

12. Testing WebFlux Applications

Testing reactive applications requires special tools and approaches.

12.1 Using WebTestClient

WebTestClient is a non-blocking, reactive client for testing WebFlux applications.

Test Configuration:

package com.example.webflux;

import com.example.webflux.controller.EmployeeController;
import com.example.webflux.service.EmployeeService;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.reactive.WebFluxTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.http.MediaType;
import org.springframework.test.web.reactive.server.WebTestClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.when;

@WebFluxTest(EmployeeController.class)
class EmployeeControllerTest {
    
    @Autowired
    private WebTestClient webTestClient;
    
    @MockBean
    private EmployeeService employeeService;
    
    private Employee testEmployee;
    
    @BeforeEach
    void setUp() {
        testEmployee = new Employee(
            "1", 
            "John Doe", 
            "Engineering", 
            75000.0, 
            "john@example.com"
        );
    }
    
    @Test
    void testCreateEmployee() {
        when(employeeService.createEmployee(any(Employee.class)))
            .thenReturn(Mono.just(testEmployee));
        
        webTestClient.post()
            .uri("/api/employees")
            .contentType(MediaType.APPLICATION_JSON)
            .bodyValue(testEmployee)
            .exchange()
            .expectStatus().isCreated()
            .expectBody(Employee.class)
            .isEqualTo(testEmployee);
    }
    
    @Test
    void testGetEmployeeById() {
        when(employeeService.getEmployeeById("1"))
            .thenReturn(Mono.just(testEmployee));
        
        webTestClient.get()
            .uri("/api/employees/1")
            .exchange()
            .expectStatus().isOk()
            .expectBody()
            .jsonPath("$.id").isEqualTo("1")
            .jsonPath("$.name").isEqualTo("John Doe")
            .jsonPath("$.department").isEqualTo("Engineering")
            .jsonPath("$.salary").isEqualTo(75000.0);
    }
    
    @Test
    void testGetEmployeeById_NotFound() {
        when(employeeService.getEmployeeById("999"))
            .thenReturn(Mono.empty());
        
        webTestClient.get()
            .uri("/api/employees/999")
            .exchange()
            .expectStatus().isNotFound();
    }
    
    @Test
    void testGetAllEmployees() {
        Employee emp1 = new Employee("1", "Alice", "HR", 60000.0, "alice@example.com");
        Employee emp2 = new Employee("2", "Bob", "IT", 70000.0, "bob@example.com");
        
        when(employeeService.getAllEmployees())
            .thenReturn(Flux.just(emp1, emp2));
        
        webTestClient.get()
            .uri("/api/employees")
            .exchange()
            .expectStatus().isOk()
            .expectBodyList(Employee.class)
            .hasSize(2)
            .contains(emp1, emp2);
    }
    
    @Test
    void testUpdateEmployee() {
        Employee updatedEmployee = new Employee(
            "1", 
            "John Smith", 
            "Engineering", 
            80000.0, 
            "john.smith@example.com"
        );
        
        when(employeeService.updateEmployee(anyString(), any(Employee.class)))
            .thenReturn(Mono.just(updatedEmployee));
        
        webTestClient.put()
            .uri("/api/employees/1")
            .contentType(MediaType.APPLICATION_JSON)
            .bodyValue(updatedEmployee)
            .exchange()
            .expectStatus().isOk()
            .expectBody()
            .jsonPath("$.name").isEqualTo("John Smith")
            .jsonPath("$.salary").isEqualTo(80000.0);
    }
    
    @Test
    void testDeleteEmployee() {
        when(employeeService.deleteEmployee("1"))
            .thenReturn(Mono.empty());
        
        webTestClient.delete()
            .uri("/api/employees/1")
            .exchange()
            .expectStatus().isNoContent();
    }
}

12.2 Testing Reactive Streams with StepVerifier

StepVerifier is used to test Publisher implementations (Mono and Flux).

package com.example.webflux.service;

import com.example.webflux.model.Employee;
import com.example.webflux.repository.EmployeeRepository;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
class EmployeeServiceTest {
    
    @Mock
    private EmployeeRepository employeeRepository;
    
    @InjectMocks
    private EmployeeServiceImpl employeeService;
    
    @Test
    void testCreateEmployee() {
        Employee employee = new Employee(
            null, 
            "Jane Doe", 
            "Marketing", 
            65000.0, 
            "jane@example.com"
        );
        
        Employee savedEmployee = new Employee(
            "123", 
            "Jane Doe", 
            "Marketing", 
            65000.0, 
            "jane@example.com"
        );
        
        when(employeeRepository.existsByEmail(any())).thenReturn(Mono.just(false));
        when(employeeRepository.save(any(Employee.class))).thenReturn(Mono.just(savedEmployee));
        
        Mono<Employee> result = employeeService.createEmployee(employee);
        
        StepVerifier.create(result)
            .expectNextMatches(emp -> 
                emp.getId().equals("123") && 
                emp.getName().equals("Jane Doe")
            )
            .verifyComplete();
    }
    
    @Test
    void testGetAllEmployees() {
        Employee emp1 = new Employee("1", "Alice", "HR", 60000.0, "alice@example.com");
        Employee emp2 = new Employee("2", "Bob", "IT", 70000.0, "bob@example.com");
        Employee emp3 = new Employee("3", "Charlie", "Sales", 55000.0, "charlie@example.com");
        
        when(employeeRepository.findAll()).thenReturn(Flux.just(emp1, emp2, emp3));
        
        Flux<Employee> result = employeeService.getAllEmployees();
        
        StepVerifier.create(result)
            .expectNext(emp1)
            .expectNext(emp2)
            .expectNext(emp3)
            .verifyComplete();
    }
    
    @Test
    void testGetEmployeeById_NotFound() {
        when(employeeRepository.findById("999")).thenReturn(Mono.empty());
        
        Mono<Employee> result = employeeService.getEmployeeById("999");
        
        StepVerifier.create(result)
            .expectError(EmployeeNotFoundException.class)
            .verify();
    }
    
    @Test
    void testFluxWithError() {
        when(employeeRepository.findAll())
            .thenReturn(Flux.error(new RuntimeException("Database error")));
        
        Flux<Employee> result = employeeService.getAllEmployees();
        
        StepVerifier.create(result)
            .expectError(RuntimeException.class)
            .verify();
    }
    
    @Test
    void testFluxWithDelay() {
        Employee emp = new Employee("1", "Test", "IT", 50000.0, "test@example.com");
        
        Flux<Employee> delayedFlux = Flux.just(emp)
            .delayElements(Duration.ofSeconds(1));
        
        StepVerifier.create(delayedFlux)
            .expectNext(emp)
            .expectComplete()
            .verify(Duration.ofSeconds(2));  // Timeout after 2 seconds
    }
}

12.3 Integration Testing

package com.example.webflux;

import com.example.webflux.model.Employee;
import com.example.webflux.repository.EmployeeRepository;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.http.MediaType;
import org.springframework.test.web.reactive.server.WebTestClient;

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@AutoConfigureWebTestClient
class EmployeeIntegrationTest {
    
    @Autowired
    private WebTestClient webTestClient;
    
    @Autowired
    private EmployeeRepository employeeRepository;
    
    @BeforeEach
    void setUp() {
        employeeRepository.deleteAll().block();
    }
    
    @AfterEach
    void tearDown() {
        employeeRepository.deleteAll().block();
    }
    
    @Test
    void testCreateAndRetrieveEmployee() {
        Employee employee = new Employee(
            null, 
            "Integration Test", 
            "QA", 
            70000.0, 
            "integration@test.com"
        );
        
        // Create employee
        webTestClient.post()
            .uri("/api/employees")
            .contentType(MediaType.APPLICATION_JSON)
            .bodyValue(employee)
            .exchange()
            .expectStatus().isCreated()
            .expectBody()
            .jsonPath("$.id").isNotEmpty()
            .jsonPath("$.name").isEqualTo("Integration Test");
        
        // Verify it's in database
        webTestClient.get()
            .uri("/api/employees")
            .exchange()
            .expectStatus().isOk()
            .expectBodyList(Employee.class)
            .hasSize(1);
    }
}

13. Performance Comparison: Spring MVC vs WebFlux

13.1 When WebFlux Performs Better

Scenarios Favoring WebFlux:

  1. High Concurrency: 1000+ concurrent connections
  2. I/O-Bound Operations: Heavy database or external API calls
  3. Streaming Data: Real-time updates, SSE, WebSocket
  4. Microservices: Service-to-service communication
  5. Long-Running Requests: Keeps connections open efficiently

13.2 Performance Characteristics

Unsaturated State (Low Load):

  • Spring MVC and WebFlux have similar performance
  • Response times are comparable
  • Both handle requests efficiently

Saturated State (High Load):

  • Spring MVC: Thread pool exhaustion occurs earlier
  • WebFlux: Handles higher concurrency with fewer resources
  • Memory: WebFlux uses less memory (fewer threads)
  • Throughput: WebFlux maintains better throughput under load

13.3 Load Test Results Summary

Based on industry benchmarks:

MetricSpring MVCWebFlux
Low Concurrency (100-200)SimilarSimilar
Saturation Point~600-900 requests~900-1500 requests
Memory UsageHigher (thread overhead)Lower (event-loop)
CPU EfficiencyLower (context switching)Higher (fewer threads)
ScalabilityLimited by thread poolScales with connections

14. Best Practices and When to Use WebFlux

14.1 When to Use WebFlux

✅ Use WebFlux When:

  1. High Concurrency Requirements: Thousands of concurrent users
  2. Real-Time Applications: Chat apps, notifications, live dashboards
  3. Streaming Data: Video/audio streaming, data feeds
  4. Microservices Communication: High inter-service communication
  5. I/O Heavy Operations: Lots of database/API calls
  6. Scalability is Critical: Need to handle unpredictable load spikes

❌ Avoid WebFlux When:

  1. Traditional CRUD Apps: Simple applications with moderate load
  2. Blocking Dependencies: Using JDBC, blocking libraries
  3. Team Unfamiliar with Reactive: Steep learning curve
  4. CPU-Intensive Tasks: Computation-heavy operations
  5. Legacy Codebase: Migrating would be too costly

14.2 Best Practices

1. Never Block in Reactive Chains

// ❌ BAD - Blocking call
public Mono<Employee> getEmployeeBad(String id) {
    return Mono.fromCallable(() -> {
        Thread.sleep(1000);  // BLOCKS THREAD!
        return employeeRepository.findById(id).block();  // BLOCKS!
    });
}

// ✅ GOOD - Non-blocking
public Mono<Employee> getEmployeeGood(String id) {
    return employeeRepository.findById(id)
        .delayElement(Duration.ofSeconds(1));  // Non-blocking delay
}

2. Use Appropriate Schedulers

// For CPU-intensive tasks
public Mono<Result> heavyComputation() {
    return Mono.fromCallable(() -> computeIntensiveTask())
        .subscribeOn(Schedulers.parallel());
}

// For blocking I/O (as last resort)
public Mono<String> legacyBlockingCall() {
    return Mono.fromCallable(() -> legacyService.blockingMethod())
        .subscribeOn(Schedulers.boundedElastic());
}

3. Handle Errors Properly

public Mono<Employee> getEmployeeRobust(String id) {
    return employeeRepository.findById(id)
        .switchIfEmpty(Mono.error(new EmployeeNotFoundException("Not found: " + id)))
        .onErrorResume(DatabaseException.class, e -> 
            Mono.just(getDefaultEmployee()))
        .doOnError(e -> log.error("Error fetching employee", e));
}

4. Use Proper Testing

// Always use StepVerifier for testing reactive streams
@Test
void testReactiveMethod() {
    Flux<String> result = service.getNames();
    
    StepVerifier.create(result)
        .expectNext("Alice", "Bob", "Charlie")
        .verifyComplete();
}

5. Optimize Database Queries

// ❌ BAD - N+1 queries
public Flux<EmployeeDTO> getEmployeesWithDeptBad() {
    return employeeRepository.findAll()
        .flatMap(emp -> departmentRepository.findById(emp.getDeptId())
            .map(dept -> new EmployeeDTO(emp, dept))
        );
}

// ✅ GOOD - Batch queries
public Flux<EmployeeDTO> getEmployeesWithDeptGood() {
    return employeeRepository.findAll()
        .collectList()
        .flatMapMany(employees -> {
            Set<String> deptIds = employees.stream()
                .map(Employee::getDeptId)
                .collect(Collectors.toSet());
            
            return departmentRepository.findAllById(deptIds)
                .collectMap(Department::getId)
                .flatMapMany(deptMap -> 
                    Flux.fromIterable(employees)
                        .map(emp -> new EmployeeDTO(emp, deptMap.get(emp.getDeptId())))
                );
        });
}

6. Use Caching Wisely

@Service
public class CachedEmployeeService {
    
    private final Mono<List<Employee>> cachedEmployees;
    
    public CachedEmployeeService(EmployeeRepository repository) {
        this.cachedEmployees = repository.findAll()
            .collectList()
            .cache(Duration.ofMinutes(5));  // Cache for 5 minutes
    }
    
    public Mono<List<Employee>> getAllEmployees() {
        return cachedEmployees;
    }
}

7. Manage Backpressure

@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Employee> streamEmployees() {
    return employeeService.getAllEmployees()
        .limitRate(20)  // Control demand
        .delayElements(Duration.ofMillis(100));  // Throttle emission
}

8. Use Proper Logging

public Mono<Employee> getEmployee(String id) {
    return employeeRepository.findById(id)
        .doOnSubscribe(s -> log.info("Subscribing to employee query: {}", id))
        .doOnNext(emp -> log.info("Found employee: {}", emp.getName()))
        .doOnError(e -> log.error("Error finding employee: {}", id, e))
        .doOnSuccess(emp -> log.info("Successfully retrieved employee"))
        .doFinally(signal -> log.info("Query completed with signal: {}", signal));
}

Conclusion

Spring WebFlux represents a paradigm shift in how we build web applications. While it comes with a steeper learning curve, the benefits of reactive programming—scalability, efficiency, and resilience—make it an invaluable tool for modern application development.

Key Takeaways

  1. Reactive Programming: Built around data streams and asynchronous, non-blocking operations
  2. Spring WebFlux: Spring’s reactive web framework using Project Reactor
  3. Mono and Flux: Core reactive types for 0-1 and 0-N elements
  4. Non-Blocking: Event-loop model enables high concurrency with fewer threads
  5. Backpressure: Consumers control flow to prevent overwhelming
  6. Error Handling: Requires different approach using operators like onErrorResume
  7. Testing: Use WebTestClient and StepVerifier for reactive code
  8. Best Use Cases: High concurrency, streaming, real-time applications
  9. Not a Silver Bullet: Traditional Spring MVC still appropriate for many use cases

Next Steps

To master Spring WebFlux:

  1. Practice: Build small reactive applications
  2. Experiment: Try different operators and combinations
  3. Read Documentation: Project Reactor docs are excellent
  4. Study Patterns: Learn reactive design patterns
  5. Measure: Always benchmark your specific use case
  6. Stay Updated: Reactive ecosystem evolves rapidly

Post a Comment

Previous Post Next Post