When using Spring Cloud Stream with RabbitMQ, and trying to send multiple messages in a transactional way (e.g. via StreamBridge), it is important to ensure the entire flow is transactional — including message consumption and any downstream message publication.
@Component("myfunction")
public class MyFunction implements Consumer<String> {
private final StreamBridge streamBridge;
public MyFunction(StreamBridge streamBridge) {
this.streamBridge = streamBridge;
}
@Override
@Transactional
public void accept(String message) {
streamBridge.send("myfunction-out-0", message);
if (message.equals("error")) {
throw new RuntimeException("Simulated failure");
}
}
}
spring:
cloud:
function:
definition: myfunction
stream:
bindings:
myfunction-in-0:
destination: test.request
group: test-group
myfunction-out-0:
destination: test.response
rabbit:
bindings:
myfunction-in:
consumer:
transacted: true
myfunction-out:
producer:
transacted: true
rabbitmq:
host:
port:
username:
password:
Note:
Use myfunction-in-0 and myfunction-out-0 in stream.bindings
Use myfunction-in and myfunction-out in rabbit.bindings
@Configuration
public class TransactionConfig {
@Bean
public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory) {
return new RabbitTransactionManager(connectionFactory);
}
}
The -in-0 style works for stream.bindings, but not for rabbit.bindings, which expects the logical binding name (myfunction-in)
Work with our skilled Cloud developers to accelerate your project and boost its performance.
Hire Cloud Developers