In this technical post we will see how to integrate Sever-sent events in a Spring Webflux application. Please read this previous Spring Webflux Basics before conitnue with this information. Then, let’s create a new Spring Boot project with Webflux and Lombok as dependencies:
spring init --dependencies=webflux,lombok --language=java --build=gradle spring-boot-sse
Here is the complete build.gradle
file generated:
plugins {
id 'org.springframework.boot' version '2.3.4.RELEASE'
id 'io.spring.dependency-management' version '1.0.10.RELEASE'
id 'java'
}
group = 'com.jos.dem.springboot.sse'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '13'
configurations {
compileOnly {
extendsFrom annotationProcessor
}
}
repositories {
mavenCentral()
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-webflux'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
testImplementation('org.springframework.boot:spring-boot-starter-test') {
exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
}
testImplementation 'io.projectreactor:reactor-test'
}
test {
useJUnitPlatform()
}
Let’s start by creating a controller to serve our stream data
package com.jos.dem.springboot.sse.controller;
import lombok.RequiredArgsConstructor;
import reactor.core.publisher.Flux;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import com.jos.dem.springboot.sse.model.Event;
import com.jos.dem.springboot.sse.service.MessageService;
@RestController
@RequiredArgsConstructor
public class MessageController {
private final MessageService messageService;
@GetMapping(path = "/", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Event> index() {
return messageService.stream();
}
}
MediaType.TEXT_EVENT_STREAM_VALUE
represents plain text event sent that follows SSE format, responses starts with "data:"
. Now let’s use Event
as domain transfer object
package com.jos.dem.springboot.sse.model;
import java.time.Instant;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.AllArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Event {
private String nickname;
private String text;
private Instant timestamp;
}
Next step is create a service which returns a Flux
object with our data
package com.jos.dem.springboot.sse.service;
import reactor.core.publisher.Flux;
import com.jos.dem.springboot.sse.model.MessageCommand;
public interface MessageService {
Flux<MessageCommand> stream();
}
Here is our implementation
package com.jos.dem.springboot.sse.service.impl;
import java.time.Instant;
import java.time.Duration;
import lombok.RequiredArgsConstructor;
import reactor.core.publisher.Flux;
import org.springframework.stereotype.Service;
import com.jos.dem.springboot.sse.model.Event;
import com.jos.dem.springboot.sse.service.MessageService;
import com.jos.dem.springboot.sse.util.EventGenerator;
@Service
@RequiredArgsConstructor
public class MessageServiceImpl implements MessageService {
private final EventGenerator eventGenerator;
public Flux<Event> stream() {
return Flux.interval(Duration.ofSeconds(1))
.map(it -> new Event("josdem", eventGenerator.generate(), Instant.now()));
}
}
Where:
Flux.interval
Simulate data streaming every secondeventGenerator
Generates a random message
Here is our message generator class
package com.jos.dem.springboot.sse.util;
import java.util.List;
import java.util.Arrays;
import java.util.Random;
import org.springframework.stereotype.Component;
@Component
public class EventGenerator {
private List<String> events = Arrays.asList(
"Bonjour",
"Hola",
"Zdravstvuyte",
"Salve",
"Guten Tag",
"Hello");
private final Random random = new Random(events.size());
public String generate() {
return events.get(random.nextInt(events.size()));
}
}
So, now if you execute our Spring Boot application
gradle bootRun
And hit our Spring Boot applicaiton end-point
curl http://localhost:8080/
Then you should see something similar to this output
data:{"nickname":"josdem","text":"Hola","timestamp":"2019-04-25T12:51:48.693987Z"}
data:{"nickname":"josdem","text":"Bonjour","timestamp":"2019-04-25T12:51:49.692895Z"}
data:{"nickname":"josdem","text":"Zdravstvuyte","timestamp":"2019-04-25T12:51:50.693260Z"}
data:{"nickname":"josdem","text":"Bonjour","timestamp":"2019-04-25T12:51:51.693231Z"}
data:{"nickname":"josdem","text":"Hello","timestamp":"2019-04-25T12:51:52.693324Z"}
Finally, let’s use a Non-blocking, reactive client for testing our web layer, if you want to know more about WebTestClient, please go to my previous post here.
package com.jos.dem.springboot.sse;
import com.jos.dem.springboot.sse.model.Event;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.http.MediaType;
import org.springframework.test.web.reactive.server.WebTestClient;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
@Slf4j
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)
@RequiredArgsConstructor(onConstructor_ = @Autowired)
class ServerSentEventsClientApplicationTest {
private final WebTestClient webClient;
@Test
@DisplayName("Should get five events")
void shouldConsumeServerSentEvents(TestInfo testInfo) {
log.info("Running: {}", testInfo.getDisplayName());
List<Event> events = webClient.get().uri("/")
.accept(MediaType.valueOf(MediaType.TEXT_EVENT_STREAM_VALUE))
.exchange()
.expectStatus().isOk()
.returnResult(Event.class)
.getResponseBody()
.take(5)
.collectList()
.block();
events.forEach(event -> log.info("event: {}", event));
assertEquals(5, events.size());
}
}
NOTE: Do not forget to add this lines to your build.gradle
file so that you can use Lombok in test cases.
testCompileOnly 'org.projectlombok:lombok'
testAnnotationProcessor "org.projectlombok:lombok"
To run the project:
gradle bootRun
To test the project:
gradle test
To browse the project go here, to download the project:
git clone git@github.com:josdem/spring-boot-sse.git