Mono blockingWrapper = Mono.fromCallable(() -> {
return /* make a remote synchronous call */
});
blockingWrapper = blockingWrapper.subscribeOn(Schedulers.elastic());
@GetMapping(value = "/v1/measurements")
public Flux<Measurement> getMeasurements() {
return Flux.defer(() -> Flux.fromIterable(repository.findByFromDateGreaterThanEqual(new Date(1486980000L))))
.subscribeOn(Schedulers.elastic());
}
@Configuration
public class SchedulerConfiguration {
private final Integer connectionPoolSize;
public SchedulerConfiguration(@Value("${spring.datasource.maximum-pool-size}") Integer connectionPoolSize) {
this.connectionPoolSize = connectionPoolSize;
}
@Bean
public Scheduler jdbcScheduler() {
return Schedulers.fromExecutor(Executors.newFixedThreadPool(connectionPoolSize));
}
}
@Service
public class AddressService {
private final AddressRepository repository;
private final Scheduler scheduler;
public AddressRouter(AddressRepository repository, @Qualifier("jdbcScheduler") Scheduler scheduler) {
this.repository = repository;
this.scheduler = scheduler;
}
public Mono<Iterable<Address>> findAll() {
return async(() -> repository.findAll());
}
private <T> Mono<T> async(Callable<T> callable) {
return Mono.fromCallable(callable).publishOn(scheduler);
}
}