Implementing a CQRS Message Bus in Spring with ResolvableType
An important aspect of Command Query Responsibility Segregation (CQRS) is decoupling the actions a system can perform from the way it performs them. This is usually implemented with a message pattern, where the entry points of the system construct an object corresponding to an action that should be taken, and the internal system executes it. This pattern is known under multiple names, such as mediator, bus, or dispatcher, but this post uses dispatcher. It is typically implemented in the following way:
- A
Message<R>
interface specifies the available actions in the system.- In a CQRS context, you would usually split this into a
Command
and aQuery<R>
.
- In a CQRS context, you would usually split this into a
- A
MessageDispatcher
interface specifies<M extends Message<R>, R> R dispatch(M message)
, which entry points of the system use to execute actions. - Internally, a Bean implements an
MessageHandler
functional interface with the methodR handle(Q query)
.
The standard implementations of these interfaces are:
public interface Message<R> {}
public interface MessageDispatcher {
<M extends Message<R>, R> R dispatch(M message);
}
// The MessageHandler should be internal to the application layer
public interface MessageHandler<M extends Message<R>, R> {
R handle(M query);
}
The Message
and MessageHandler
are left abstract, as each implementation will be solution-specific. The MessageDispatcher
requires an implementation that supports routing messages into the system. A simple approach I have used successfully in smaller solutions is:
public class MessageDispatcherImpl extends MessageDispatcher{
@Autowired
public final List<MessageHandler<?, ?>> handlers;
public <M extends Message<R>, R> R dispatch(M message) {
for (final var handler : handlers) {
if (handlerMatches(message, handler)) {
return handler.handle(message);
}
}
throw new IllegalArgumentException("No handler exists for " + message.getClass());
}
private boolean handlerMatches(Message<?> message, MessageHandler<?, ?> handler) {
// In theory, this method would compare the message type with the handler's generic type.
// However, due to Java's type erasure and the possibility of Spring proxies, this approach
// can be unreliable and difficult to maintain. That's why I left it out
}
}
This generic approach works well when the number of commands in the system is small. However, as the system grows, Spring may provide Proxy
implementations of the MessageHandler
s instead of the actual implementations. This causes problems with Java’s standard generic reflection, which may fail to find the correct interfaces and generic arguments, resulting in no implementations being found.
I initially tried an implementation where each MessageHandler
has a boolean supports(Message<?> m)
method. However, this can become inefficient when the list of handlers grows large, as you may need to query many handlers before finding the correct one. We needed an implementation that retrieves the correct handler on the first attempt.
This led me to using the ApplicationContext
to locate the correct implementation. This pattern is usually called the Service Locator Pattern, which I mostly view as an anti-pattern because excessive use can make systems difficult to understand and test. However, in this specific case, it represents a correct use case, an exception that proves the rule.
Java uses type erasure, which means generic type information is not fully retained at runtime. For example, a List<String>
becomes just a List
once the program is running. This makes it challenging to locate the correct MessageHandler
for a given message using standard reflection. Fortunately, Spring provides ResolvableType
, which allows us to inspect generic types at runtime and retrieve the correct handler from the ApplicationContext
. Using this approach, the MessageDispatcherImpl
can be set up as follows:
@Service
@Transactional
@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true)
@RequiredArgsConstructor
public class MessageDispatcherImpl implements MessageDispatcher {
ApplicationContext applicationContext;
@Override
@SuppressWarnings("unchecked")
public <M extends Message<R>, R> R dispatch(M message) {
final var messageClass = ResolvableType.forClass(message.getClass());
final var resultClass = Arrays.stream(messageClass.getInterfaces())
.filter(type -> null != type.getRawClass())
// If the message implements multiple interfaces we need to find the right one
.filter(type -> type.getRawClass().isAssignableFrom(Message.class))
.findAny()
// Will never throw as message is guaranteed to implement Message in one of its interfaces.
.orElseThrow()
// Message only has 1 required generic argument, so [0] can be used.
.getGenerics()[0];
final var messageHandler = (MessageHandler<M, R>) applicationContext.getBeanProvider(
ResolvableType.forClassWithGenerics(
MessageHandler.class,
messageClass,
resultClass
)
).getObject();
return messageHandler.handle(message);
}
}
This way of implementing the dispatcher is compact and pragmatic. However, there are two common pitfalls: if you introduce a new Message
without providing a corresponding MessageHandler
, the dispatcher will throw a NoSuchBeanDefinitionException
; and if multiple handlers exist for the same Message
, the dispatcher will throw a NoUniqueBeanDefinitionException
.
This pattern works very well for separating external layers, such as REST, from the internal application. I have used it successfully across multiple projects. In Java, the previous versions work well. For C#, I recommend the MediatR library.
This is a relatively recent implementation, but so far it has worked well. If any problems are discovered, I will be sure to revisit this post.