Published 26 Oct, 2022

Java - How to merge two execute two reactor fluxes in parallel that return lists and merge the results

Category Java
Modified : Nov 30, 2022
40

I have a LegacyAccountDto that I need to build a list of from two separate sources. One is a local JPA repository and the other is a web service call. The web service version has the accountStatus available where the JPA data source does not. I need to execute two calls in parallel as Fluxes, and then when they both complete, I need to find the legacyId of the web service list and populate the list with the accountStatus pulled from the web service. The whole idea is to return a list with the completed DTO. I don't need to save it back to the web service or the JPA repo

The DTO:

@Data
@JsonInclude(Include.NON_NULL)
public class LegacyAccountDto {
  private UUID id;
  private UUID organizationId;
  private String name;
  private String website;
  private Long legacyAccountId;
  private LocalDateTime legacyCreated;
  private String accountType;
  private String accountState;
}

Each function in the merge statement returns a Flux of LegacyDTO

Flux<LegacyAccountDto> completed = Flux.merge(
      getLegacyAccountsFromSvc(accountNames),
      Flux.fromIterable(accountMapper.accountListToLegacyAccountDtoList(accountRepository.getAccountsByNames(accountNames).get()))
    )
    .parallel()
    .runOn(Schedulers.elastic())
    .???????((list1, list2) -> {
      list2.map(l2 -> {
        //find list1 by legacyId
        //set l2.accountStatus = l1.accountstatus
      })
      //return the completed list as a flux
    })

I'm not sure what function to call next to be able to have access to both lists and grab the accountStatus out of the second call and be able to merge it and have it not be returning a parallel flux type rather than just the Flux of LegacyDto

Answers

There are 1 suggested solutions here and each one has been listed below with a detailed description. The following topics have been covered briefly such as Java, Spring Boot, Project Reactor. These have been categorized in sections for a clear and precise explanation.

43

You could do it this way:

Mono<List<LegacyAccountDto>> firstMonoList = getLegacyAccountsFromSvc(accountNames).collectList();
Mono<List<EntityX> secondMonoList = accountMapper.accountListToLegacyAccountDtoList(accountRepository.getAccountsByNames(accountNames).get()).collectList();

Mono.zip(firstMonoList, secondMonoList)
.map(listTuple -> {
  // do the searching and map to list of dtos
  return resultingList;
})
.flatMapMany(Flux::fromIterable);

It's not advised to use blocking database calls in reactive flow. If you have the option you could add the R2DBC driver and make the whole thing reactive.