visit
This post was co-written with Ben Wilcock, Product and Technical Marketing Manager for Spring at Pivotal.
🔔 A file has been uploaded! 🔔
🔔 A new user was registered! 🔔
🔔 An order was placed! 🔔
The above is an example of an event-driven architecture, where instead of reaching out to each service one by one, our services instead emit a change of state. If a file is uploaded, our file service can emit it out to a messaging platform, and then our Super Duper Image Resizer 3000 service can listen for that and automatically generate differently sized profile images. Pivotal’s own Richard Seroter in detail, and it’s a great read. In his blog post, Richard talks about messaging as a way of reliably delivering events to many consumers quickly and in volume.
He also touches on something we want to talk about today: .We’re big fans of both Kafka and RabbitMQ as event streaming platforms, so for this demo we’ll use Kafka. No matter which you choose to use, making it easy to produce and consume events is important for your developers. I’ve used a lot of frameworks that abstract away from the underlying message queue, but none quite as easy and flexible as Spring Cloud Stream. My teammate Ben Wilcock put together a demo that really shows just how easy it is to get up and running. Let’s take it for a spin—and to follow along, you can .docker-compose up
Let’s start by producing some messages that will be sent to Kafka, the code for which is in the
loansource
directory.There are a few files of code here. The
Loan.java
file defines a loan
object and the Statuses.java
file defines all the states a loan can be in. What’s interesting, though, is the LoansourceApplication.java
file, which is what’s actually producing our messages. As you can imagine, Spring and its dependencies handle a lot of the wiring up of components for us automatically. Let’s take a look at
LoansourceApplication.java
to see how this works.@Bean
public Supplier<Loan> supplyLoan() {
return () -> {
String rName = names.get(new Random().nextInt(names.size()));
Long rAmount = amounts.get(new Random().nextInt(amounts.size()));
Loan loan = new Loan(UUID.randomUUID().toString(), rName, rAmount);
log.info("{} {} for ${} for {}", loan.getStatus(), loan.getUuid(), loan.getAmount(), loan.getName());
return loan;
};
}
Supplier<>
is a Java function data type. Because there is only one @Bean
method that returns this type, Spring Cloud Stream knows exactly what to do next. By default, it will trigger this function once every second and send the result to the default MessageChannel
named output
. What’s nice about this function method is that it only contains business logic, so you can test it using your favorite testing methods.We could use the
spring.cloud.function.definition
property in the application.properties file to explicitly declare which function bean we want to be bound to binding destinations, but for cases when you only have a single @Bean
defined, this is not necessary. Likewise, if we wanted to use a different poller interval, we can use the
spring.integration.poller.fixed-delay
property in the application.properties
file. The only question that remains is, “How does Spring know it’s Kafka we’re writing to?” For that, we take a look at our pom.xml
:<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
Providing this dependency in our code tells Spring, “I’d like to send these messages to Kafka”. Since our Kafka server is listening on
localhost
on the default port, we don’t need to provide any additional configuration in our application.properties
file, but if that’s not the case, providing information such as hostname, port, authentication, etc.We can run our code and activate the
kafka
profile, which we’ve configured to be the profile that includes the Kafka SCS binding, and we should see it start producing messages:cd loansource
./mvnw package spring-boot:run -DskipTests=true -Pkafka
2019-10-15...LoansourceApplication : PENDING 9eff9b58-e1f1-474d-8f1d-aa4db8dbb75a for $10000000 for Donald
2019-10-15...LoansourceApplication : PENDING d507c06c-81bb-4a98-8f85-38f74af36984 for $100 for Jacinda
2019-10-15...LoansourceApplication : PENDING 19fc86a4-d461-470c-8005-423ce1a258e7 for $100 for Jacinda
2019-10-15...LoansourceApplication : PENDING 33f3756c-ea9b-472f-bad2-73f1647188b1 for $10000 for Vladimir
2019-10-15...LoansourceApplication : PENDING 1625d10f-c1c8-4e75-8fe8-10ce363ef56f for $10000000 for Theresa
If you prefer, you can also see the messages in your browser using . Simply point your browser to
localhost:9000
and you should see a UI that allows you to look at the messages stored in Kafka:We’ve got half of the equation here, but we also need something to consume and process these events. For this, we’ll look in the
loancheck
directory. For this half of the demo, our loan checker will observe every application and approve or decline it. If approved, an approval message will be sent to the approved
topic otherwise, a denial message will be sent to the declined
topic. You can extrapolate from here that other systems down the line could listen for and pick up these messages for further processing. For example, maybe a payout system listens for an approved loan to start processing it.We’ll see the code here is a little different, just pointing to different topics. We see that in
LoanCheckApplication.java
, we have the @EnableBinding(LoanProcessor.class)
annotation, meaning that all of our definitions for channel bindings are found in the LoanProcessor
class.In our
LoanProcessor.java
file, we’ll see we define the MessageChannel
we’re listening on is named output, matching the default topic our producer writes to. Additionally, we define two other MessageChannels that we’ll be writing to, approved
and declined
. For each of these, we also define which method to invoke when a message is received on those channels.@Component
public interface LoanProcessor {
String APPLICATIONS_IN = "output";
String APPROVED_OUT = "approved";
String DECLINED_OUT = "declined";
@Input(APPLICATIONS_IN)
SubscribableChannel sourceOfLoanApplications();
@Output(APPROVED_OUT)
MessageChannel approved();
@Output(DECLINED_OUT)
MessageChannel declined();
}
Finally, we can see how this ties into which method is invoked if we take a look at the
LoanChecker.java
file. We’ll see we have a method checkAndSortLoans
with the @StreamListener
annotation that matches our Input we defined previously: @StreamListener(LoanProcessor.APPLICATIONS_IN)
public void checkAndSortLoans(Loan loan) {
log.info("{} {} for ${} for {}", loan.getStatus(), loan.getUuid(), loan.getAmount(), loan.getName());
if (loan.getAmount() > MAX_AMOUNT) {
loan.setStatus(Statuses.DECLINED.name());
processor.declined().send(message(loan));
} else {
loan.setStatus(Statuses.APPROVED.name());
processor.approved().send(message(loan));
}
}
We can start this code up much like we did our
loansource
, by opening up a separate terminal and running the following:cd loancheck
./mvnw package spring-boot:run -DskipTests=true -Pkafka
After a few moments, we’ll start seeing our pending messages come through and then get sorted into
approved
or declined
:2019-10-15...LoanChecker : PENDING 95a887cf-ab5f-48c4-b03b-556675446cfc for $1000 for Kim
2019-10-15...LoanChecker : APPROVED 95a887cf-ab5f-48c4-b03b-556675446cfc for $1000 for Kim
2019-10-15...LoanChecker : PENDING a15f13fe-fc9a-40fb-b6f0-24106a18c0cd for $100000000 for Angela
2019-10-15...LoanChecker : DECLINED a15f13fe-fc9a-40fb-b6f0-24106a18c0cd for $100000000 for Angela