Java – How to use the Scatter-Gather pattern correctly?

How to use the Scatter-Gather pattern correctly?… here is a solution to the problem.

How to use the Scatter-Gather pattern correctly?

I’m still new to Spring Integration, and I tried using IntetrationFlowDefinition.scatterGather() to no avail. The general idea is:

  • Take a String as input
  • Use the splitter to mark it with a space character
  • Each token is sent to a set of services, each returning a value
  • Summarize the responses for each service
  • Summarize the previously summarized responses into a List
  • Convert the list to a single String

I’m using a messaging gateway as my input/output:

@MessagingGateway(name = "gateway")
public interface TestGateway {

@Gateway(requestChannel = "input")
  String process(String input);

}

And call it via:

final TestGateway gateway = (TestGateway) ctx.getBean("gateway");
System.out.println(gateway.process("this is a test"));

My IntegrationFlow is:

@Bean
public IntegrationFlow soutFlow() {
  return f -> f.handle(m -> System.out.println(m.toString()));
}

@Bean
public IntegrationFlow flow() {
  return IntegrationFlows.from("input")
    .split(s -> s.delimiters(" "))
    .wireTap(soutFlow())
    .scatterGather(
      sc -> sc.recipientFlow(m -> true, f1 -> f1.handle((p, h) -> p + " - flow 1").wireTap(soutFlow()))
        .recipientFlow(m -> true, f2 -> f2.handle((p, h) -> p + " - flow 2").wireTap(soutFlow()))
        .applySequence(true),
      ga -> ga.outputProcessor(mg -> mg.getMessages()
        .stream()
        .map(m -> m.getPayload().toString())
        .collect(Collectors.joining(", "))),
      sg -> sg.gatherTimeout(1_000))
    .wireTap(soutFlow())
    .aggregate()
    .wireTap(soutFlow())
    .transform((List<String> source) -> source.stream()
      .map(s -> "- " + s)
      .collect(Collectors.joining("\n")))
    .get();
}

The output format I would like to see is as follows:

- this - flow 1, this - flow 2
- is - flow 1, is - flow 2
- a - flow 1, a - flow 2
- test - flow 1, test - flow 2

Unfortunately, this doesn’t work, the collector always times out after 1 second. The eavesdropping debug output shows the following message:

GenericMessage [payload=this, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@649725e3, sequenceNumber= 1, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@649725e3, sequenceSize=4, correlationId= 7c9653a7-55eb-72e4-ff74-62b10782cc3b, id=6c81a1ce-9a7c-83c3-3b69-7fb2f8b3112c, timestamp=1538668173435}]
GenericMessage [payload=this - flow 1, headers={replyChannel=org.springframework.integration.channel.FixedSubscriberChannel@78aea4b9, sequenceNumber=1, gatherResultChannel= ac418949-6ade-4db5-bd4c-3cbbbb7e9a49:1, sequenceDetails=[[7c9653a7-55eb-72e4-ff74-62b10782cc3b, 1, 4]], errorChannel= org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@649725e3, sequenceSize=2, correlationId=8d29aea7-3ec7-2c15-5b92-fbb0af002a3f, id= 7ca4de30-f801-ad14-6452-249c85e9ab36, timestamp=1538668173446}]
GenericMessage [payload=this - flow 2, headers={replyChannel=org.springframework.integration.channel.FixedSubscriberChannel@78aea4b9, sequenceNumber=2, gatherResultChannel= ac418949-6ade-4db5-bd4c-3cbbbb7e9a49:1, sequenceDetails=[[7c9653a7-55eb-72e4-ff74-62b10782cc3b, 1, 4]], errorChannel= org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@649725e3, sequenceSize=2, correlationId=8d29aea7-3ec7-2c15-5b92-fbb0af002a3f, id= 71b22112-8e7b-c2c5-057f-2a91c49af8c8, timestamp=1538668173446}]
GenericMessage [payload=is, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@649725e3, sequenceNumber=2, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@649725e3, sequenceSize=4, correlationId= 7c9653a7-55eb-72e4-ff74-62b10782cc3b, id=9da2f181-82de-d50e-51bb-229a3e32b998, timestamp=1538668174448}]
GenericMessage [payload=is - flow 1, headers={replyChannel=org.springframework.integration.channel.FixedSubscriberChannel@78aea4b9, sequenceNumber=1, gatherResultChannel= ac418949-6ade-4db5-bd4c-3cbbbb7e9a49:2, sequenceDetails=[[7c9653a7-55eb-72e4-ff74-62b10782cc3b, 2, 4]], errorChannel= org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@649725e3, sequenceSize=2, correlationId=0abfd6a8-713d-e0cb-bc59-a111a85429df, id= d5102417-f6c2-cbac-197a-b9c4f3f51a74, timestamp=1538668174448}]
GenericMessage [payload=is - flow 2, headers={replyChannel=org.springframework.integration.channel.FixedSubscriberChannel@78aea4b9, sequenceNumber=2, gatherResultChannel= ac418949-6ade-4db5-bd4c-3cbbbb7e9a49:2, sequenceDetails=[[7c9653a7-55eb-72e4-ff74-62b10782cc3b, 2, 4]], errorChannel= org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@649725e3, sequenceSize=2, correlationId=0abfd6a8-713d-e0cb-bc59-a111a85429df, id= d9b31390-d3d1-9b61-43fc-45c06ac86931, timestamp=1538668174449}]
GenericMessage [payload=a, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@649725e3, sequenceNumber=3, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@649725e3, sequenceSize=4, correlationId= 7c9653a7-55eb-72e4-ff74-62b10782cc3b, id=f109f368-6859-ac8d-0e0c-3beb563f9594, timestamp=1538668175451}]
GenericMessage [payload=a - flow 1, headers={replyChannel=org.springframework.integration.channel.FixedSubscriberChannel@78aea4b9, sequenceNumber=1, gatherResultChannel= ac418949-6ade-4db5-bd4c-3cbbbb7e9a49:3, sequenceDetails=[[7c9653a7-55eb-72e4-ff74-62b10782cc3b, 3, 4]], errorChannel= org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@649725e3, sequenceSize=2, correlationId=0e347381-a6e2-891a-72ff-c862565d6a1e, id= e0270063-6e19-5827-7674-21fa047ab11d, timestamp=1538668175451}]
GenericMessage [payload=a - flow 2, headers={replyChannel=org.springframework.integration.channel.FixedSubscriberChannel@78aea4b9, sequenceNumber=2, gatherResultChannel= ac418949-6ade-4db5-bd4c-3cbbbb7e9a49:3, sequenceDetails=[[7c9653a7-55eb-72e4-ff74-62b10782cc3b, 3, 4]], errorChannel= org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@649725e3, sequenceSize=2, correlationId=0e347381-a6e2-891a-72ff-c862565d6a1e, id= b08aab99-10ce-4dac-2f3c-b48b4514fd35, timestamp=1538668175451}]
GenericMessage [payload=test, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@649725e3, sequenceNumber=4, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@649725e3, sequenceSize=4, correlationId= 7c9653a7-55eb-72e4-ff74-62b10782cc3b, id=52feeb27-8da4-70c4-889e-f482d2922e40, timestamp=1538668176453}]
GenericMessage [payload=test - flow 1, headers={replyChannel=org.springframework.integration.channel.FixedSubscriberChannel@78aea4b9, sequenceNumber=1, gatherResultChannel= ac418949-6ade-4db5-bd4c-3cbbbb7e9a49:4, sequenceDetails=[[7c9653a7-55eb-72e4-ff74-62b10782cc3b, 4, 4]], errorChannel= org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@649725e3, sequenceSize=2, correlationId=53e7fd2c-d3a4-0c0a-1296-2a14ef5e90ab, id= 9152e603-e436-c0a4-abde-1f7e5ae654bf, timestamp=1538668176454}]
GenericMessage [payload=test - flow 2, headers={replyChannel=org.springframework.integration.channel.FixedSubscriberChannel@78aea4b9, sequenceNumber=2, gatherResultChannel= ac418949-6ade-4db5-bd4c-3cbbbb7e9a49:4, sequenceDetails=[[7c9653a7-55eb-72e4-ff74-62b10782cc3b, 4, 4]], errorChannel= org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@649725e3, sequenceSize=2, correlationId=53e7fd2c-d3a4-0c0a-1296-2a14ef5e90ab, id= a7357efd-d34a-6a05-159e-5860e3bc5281, timestamp=1538668176454}]

I must have missed something obvious, but Spring documentation is scarce when it comes to good Java DSL examples.

Update:

Using wireTap() actually affects the process. I’ve removed it and set some breakpoints. It looks like gatherer is properly handling scattered and processed messages, the problem seems to be sending them further into the stream. The exception case is:

org.springframework.messaging.MessagingException: Failed to handle Message; nested exception is org.springframework.messaging.core.DestinationResolutionException:
 failed to look up MessageChannel with name '27cf1bec-7e06-424e-80f8-9f2f67ebaf87:2' in the BeanFactory.;
 nested exception is org.springframework.beans.factory.NoSuchBeanDefinitionException: No bean named '27cf1bec-7e06-424e-80f8-9f2f67ebaf87:2' available,
 failedMessage=GenericMessage [payload=is - flow 1, is - flow 2, headers={replyChannel=org.springframework.integration.channel.FixedSubscriberChannel@4bafe935, sequenceNumber=2,  gatherResultChannel=27cf1bec-7e06-424e-80f8-9f2f67ebaf87:2, sequenceDetails=[[2baa7135-5c02-932f-6b5d-cfd4052a965b, 2, 4]], errorChannel= org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@87b5b49, sequenceSize=2, correlationId=c04f280d-83b7-74e9-a0a6-0f89efdf795c, id= ffec13cf-531e-0721-1484-e8b26b705330, timestamp=1538735548924}]

I suppose these intermediate channels are automatically created by Spring when using IntegrationFlow. Isn’t that so?

Solution

The problem is that the collector settings are incorrect/incorrect. My assumption is that if the input to scatter-gather is a single String (tokenized word), the output should also be a single String. Unfortunately, aggregators created in this way do not properly recover sequence/related headers, causing the next aggregator to wait indefinitely for the group to complete. Provide a null collector to restore the default behavior. The downside is that the scatter-aggregate output is now a List. This can be easily mitigated by placing a converter that flattens the list before the next aggregator. The following working code:

@Bean
public IntegrationFlow flow() {
  return IntegrationFlows.from("input")
    .split(s -> s.delimiters(" "))
    .scatterGather(
      sc -> sc
        .applySequence(true)
        .recipientFlow(m -> true, f1 -> f1.handle((p, h) -> p + " - flow 1"))
        .recipientFlow(m -> true, f2 -> f2.handle((p, h) -> p + " - flow 2")),
      null
    )
    .transform((List<String> l) -> l.stream().collect(Collectors.joining(", ")))
    .aggregate()
    .transform((List<String> source) -> source.stream()
      .map(s -> "- " + s)
      .collect(Collectors.joining("\n")))
    .get();
}

The output is now exactly as expected:

- this - flow 1, this - flow 2
- is - flow 1, is - flow 2
- a - flow 1, a - flow 2
- test - flow 1, test - flow 2

Related Problems and Solutions