visit
To solve our problem, there is AsyncApi. AsyncAPI is an open source initiative that seeks to improve the current state of Event-Driven Architectures (EDA). AsyncApi has several Java tools that allow you to generate documentation from code. Springwolf is my choice because it provides a UI similar to springfox.
To get started, we need to add dependencies to gradle.build
implementation 'org.springframework.cloud:spring-cloud-starter-stream-kafka:3.2.2'
implementation 'javax.json:javax.json-api:1.1.4'
implementation 'org.glassfish:javax.json:1.1.4'
compileOnly 'org.projectlombok:lombok'
// Provides the documentation API
implementation 'io.github.springwolf:springwolf-kafka:0.6.1'
runtimeOnly 'io.github.springwolf:springwolf-ui:0.4.0'
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@AllArgsConstructor
@Getter
@Setter(AccessLevel.PROTECTED)
@EqualsAndHashCode
@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME,
include = JsonTypeInfo.As.EXISTING_PROPERTY,
property = "type",
visible = true,
defaultImpl = EntityChangedEvent.class
)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = EntityChangedEvent.type, value = EntityChangedEvent.class),
@JsonSubTypes.Type(name = EntityDeletedEvent.type, value = EntityDeletedEvent.class),
@JsonSubTypes.Type(value = EntityChangedEvent.class)
})
@JsonIgnoreProperties(ignoreUnknown = true)
public abstract class DomainEvent {
private String id;
@JsonSerialize(using = LocalDateTimeSerializer.class)
@JsonDeserialize(using = LocalDateTimeDeserializer.class)
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm:ss")
private LocalDateTime occuredOn = LocalDateTime.now();
public abstract String getType();
}
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@Getter
@Setter(AccessLevel.PRIVATE)
@EqualsAndHashCode(callSuper = true)
@JsonInclude(JsonInclude.Include.NON_NULL)
public class EntityChangedEvent extends DomainEvent {
public static final String type = "ENTITY_CHANGED_EVENT";
private String title;
private String description;
private String code;
@NonNull
private String entityId;
@Builder
public EntityChangedEvent(
String id,
@NonNull String entityId,
@NonNull String code,
String title,
String description
) {
super(id, LocalDateTime.now());
if(StringUtils.isAllBlank(title, description)) {
throw new IllegalStateException("changes is none");
}
this.entityId = entityId;
this.code = code;
this.title = title;
this.description = description;
}
@Override
public String getType() {
return type;
}
}
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@Getter
@Setter(AccessLevel.PRIVATE)
@EqualsAndHashCode(callSuper = true)
@JsonInclude(JsonInclude.Include.NON_NULL)
public class EntityDeletedEvent extends DomainEvent {
public static final String type = "ENTITY_DELETED_EVENT";
@NonNull
private String entityId;
@Builder
public EntityDeletedEvent(
String id,
@NonNull String valueId
) {
super(id, LocalDateTime.now());
this.valueId = valueId;
}
@Override
public String getType() {
return type;
}
}
/**
Consumer configuration class for initiate factories for event families
*/
@Configuration
public class KafkaConsumerConfig {
private final String SERVER;
private final String SERVER_PORT;
private final String GROUP;
public KafkaConsumerConfig(
@Value("${spring.cloud.stream.bindings.service-out-0.group}") String group,
@Value("${spring.cloud.stream.kafka.binder.brokers}") String server,
@Value("${spring.cloud.stream.kafka.binder.defaultBrokerPort}") String port
) {
this.GROUP = group;
this.SERVER = server;
this.SERVER_PORT = port;
}
/*
each key will be serialized to String and each event will be serialized to JSON
*/
private <T> ConsumerFactory<String, T> typeConsumerFactory(Class<T> clazz) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, String.format("%s:%s", SERVER, SERVER_PORT));
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP);
return new DefaultKafkaConsumerFactory<>(
props,
new StringDeserializer(),
new JsonDeserializer<>(clazz));
}
private <T> ConcurrentKafkaListenerContainerFactory<String, T> initFactory(Class<T> clazz) {
ConcurrentKafkaListenerContainerFactory<String, T> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(typeConsumerFactory(clazz));
return factory;
}
/*
init bean factory for abstract DomainEvent. This factory accept all subclasses of DomainEvent
*/
@Bean
public ConcurrentKafkaListenerContainerFactory<String, DomainEvent> domainEventKafkaListenerContainerFactory() {
return initFactory(DomainEvent.class);
}
}
@Component
@Log
@KafkaListener(
topics = "topic-test",
groupId="group-test",
containerFactory="domainEventKafkaListenerContainerFactory")
public class KafkaListeners {
@KafkaHandler
void entityChangedKafkaListener(@Payload EntityChangedEvent message) {
log.info(String.format("KafkaHandler[EntityChanged] %s %s %s", message.getType(), message.getEntityId(), message.getId()));
}
@KafkaHandler
void entityDeletedKafkaListener(@Payload EntityDeletedEvent message) {
log.info(String.format("KafkaHandler[EntityDeleted] %s %s %s", message.getType(), message.getEntityId(), message.getId()));
}
}
@Configuration
@EnableAsyncApi
public class AsyncApiConfiguration {
private final String SERVER;
private final String SERVER_PORT;
private final String CONSUMERS_BASE_PACKAGE = "consumers.package";
private final String CHANNEL_NAME;
public AsyncApiConfiguration(
@Value("${spring.cloud.stream.bindings.service-out-0.destination}") String channelName,
@Value("${spring.cloud.stream.kafka.binder.brokers}") String server,
@Value("${spring.cloud.stream.kafka.binder.defaultBrokerPort}") String port
) {
this.CHANNEL_NAME = channelName;
this.SERVER = server;
this.SERVER_PORT = port;
}
@Bean
public AsyncApiDocket asyncApiDocket() {
//application info
Info info = Info.builder()
.version("api version")
.title("service name")
.description("description")
.build();
//kafka server info
Server kafkaServer = Server.builder()
.protocol("kafka")
.url(String.format("%s:%s", SERVER, SERVER_PORT))
.build();
//init producer builder
KafkaProducerData.ProducerDataBuilder producerBuilder = ProducerData.builder()
.channelName(CHANNEL_NAME)
.channelBinding(Map.of("kafka", new KafkaChannelBinding()))
.operationBinding(Map.of("kafka", new KafkaOperationBinding()));
//list of event types for documentation
List<Class<?>> events = List.of(
EntityChangedEvent.class,
EntityDeletedEvent.class
);
//generate producers
List<ProducerData> producers = events.stream()
.map(c -> producerBuilder
.payloadType(c)
.build()
).collect(Collectors.toList());
//build AsyncApiDocket
return AsyncApiDocket.builder()
.basePackage(CONSUMERS_BASE_PACKAGE) //package for search KafkaListeners
.info(info) //application info
.producers(producers) //producers
.server("kafka", kafkaServer) //kafka srever info
.build();
}
}
The interface will be available at /springwolf/asyncapi-ui.html. Documentation in JSON format can be found at /springwolf/docs