When using Spring Cloud Stream with RabbitMQ, and trying to send multiple messages in a transactional way (e.g. via StreamBridge), it is important to ensure the entire flow is transactional — including message consumption and any downstream message publication.

Step 1: Define function

@Component("myfunction")
public class MyFunction implements Consumer<String> {

    private final StreamBridge streamBridge;
    public MyFunction(StreamBridge streamBridge) {
        this.streamBridge = streamBridge;
    }

    @Override
    @Transactional
    public void accept(String message) {
        streamBridge.send("myfunction-out-0", message);
        if (message.equals("error")) {
            throw new RuntimeException("Simulated failure");
        }
    }
}

Step 2: Configuration, application.yml

spring:
  cloud:
    function:
      definition: myfunction

    stream:
      bindings:
        myfunction-in-0:
          destination: test.request
          group: test-group
        myfunction-out-0:
          destination: test.response

      rabbit:
        bindings:
          myfunction-in:
            consumer:
              transacted: true
          myfunction-out:
            producer:
              transacted: true

  rabbitmq:
    host: 
    port: 
    username: 
    password: 

Note:
Use myfunction-in-0 and myfunction-out-0 in stream.bindings
Use myfunction-in and myfunction-out in rabbit.bindings

Step 3: Transaction Manager Bean

@Configuration
public class TransactionConfig {
    @Bean
    public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory) {
        return new RabbitTransactionManager(connectionFactory);
    }
}

The -in-0 style works for stream.bindings, but not for rabbit.bindings, which expects the logical binding name (myfunction-in)

Need Help With Cloud Development?

Work with our skilled Cloud developers to accelerate your project and boost its performance.

Hire Cloud Developers

Support On Demand!

Related Q&A