visit
Welcome to the final part of the series. In this article, we will explore how to detect high traffic anomalies using Spring State Machine (SSM) and Spring Reactor.
Series began with an examination of keyword-based detection in part1. Then we focused on techniques that utilize error rate analysis in part2.
Step 1: Update the existing model
public enum LogEvent {
KEYWORD_DETECTED, HIGH_FREQUENCY, HIGH_ERROR_RATE, NORMAL_ACTIVITY
}
public enum LogState {
NORMAL, KEYWORD_ALERT, FREQUENCY_ALERT, ERROR_RATE_ALERT
}
Step2: Update state transition
@Override
public void configure(StateMachineTransitionConfigurer<LogState, LogEvent> transitions) throws Exception {
transitions
.withExternal()
.source(LogState.NORMAL).target(LogState.KEYWORD_ALERT)
.event(LogEvent.KEYWORD_DETECTED)
.and()
.withExternal()
.source(LogState.NORMAL).target(LogState.FREQUENCY_ALERT)
.event(LogEvent.HIGH_FREQUENCY)
.and()
.withExternal()
.source(LogState.NORMAL).target(LogState.ERROR_RATE_ALERT)
.event(LogEvent.HIGH_ERROR_RATE)
.and()
.withExternal()
.source(LogState.KEYWORD_ALERT).target(LogState.NORMAL)
.event(LogEvent.NORMAL_ACTIVITY)
.and()
.withExternal()
.source(LogState.FREQUENCY_ALERT).target(LogState.NORMAL)
.event(LogEvent.NORMAL_ACTIVITY)
.and()
.withExternal()
.source(LogState.ERROR_RATE_ALERT).target(LogState.NORMAL)
.event(LogEvent.NORMAL_ACTIVITY);
}
Step3: Detect High Traffic anomaly
private Mono<AnomalyResult> detectHighTraffic(LogEntry entry, StateMachine<LogState, LogEvent> stateMachine,
ConcurrentHashMap<String, Integer> ipFrequencyMap) {
if (ipFrequencyMap.get(entry.ipAddress()) > frequencyThreshold) {
Mono<Message<LogEvent>> event = Mono.just(MessageBuilder.withPayload(LogEvent.HIGH_FREQUENCY).build());
Flux<StateMachineEventResult<LogState, LogEvent>> results = stateMachine.sendEvent(event);
return results.next().flatMap(result -> {
if (result.getResultType() == StateMachineEventResult.ResultType.ACCEPTED) {
return Mono.just(new AnomalyResult(entry, LogState.FREQUENCY_ALERT));
} else {
return Mono.empty();
}
});
} else {
return Mono.empty();
}
}
Keep track of high traffic occurrence:
private void updateOccurrence(LogEntry entry, ConcurrentHashMap<String, Integer> ipFrequencyMap,
ConcurrentHashMap<String, Integer> ipErrorMap) {
ipFrequencyMap.compute(entry.ipAddress(), (key, value) -> (value == null) ? 1 : value + 1);
if (entry.isError()) {
ipErrorMap.compute(entry.ipAddress(), (key, value) -> (value == null) ? 1 : value + 1);
}
}
Step4: Integrate all the detection techniques
This method implements a cascading anomaly detection strategy for each log entry by sequentially applying multiple checks. It begins by assessing the log for a high error rate using the detectHighErrorRate
function. If no significant error rate is found, the method then checks for abnormal frequency using the detectHighFrequency
function. Should the frequency appear normal, the next step involves searching for keyword-based anomalies with the detectKeyword
function. If none of these checks indicate an anomaly, the method concludes by returning NORMAL state.
private Mono<AnomalyResult> detectAnomaly(LogEntry entry, StateMachine<LogState, LogEvent> stateMachine,
ConcurrentHashMap<String, Integer> ipFrequencyMap, ConcurrentHashMap<String, Integer> ipErrorMap) {
return
detectHighErrorRate(entry, stateMachine,ipErrorMap )
.switchIfEmpty(detectHighTraffic(entry, stateMachine, ipFrequencyMap))
.switchIfEmpty(detectKeyword(entry, stateMachine))
.switchIfEmpty(Mono.just(new AnomalyResult(entry, LogState.NORMAL)));
}
Step5: Process Log files:
The processLogsForGivenWindow
method is designed to process log entries within a specified time window, applying anomaly detection through a state machine. First, it initializes a state machine instance using stateMachineFactory.getStateMachine()
and creates two concurrent hash maps: one (ipFrequencyMap
) to track the frequency of occurrences for each IP address, and another (ipErrorMap
) to monitor the number of error entries associated with each IP address.
The method begins by starting the state machine reactively with stateMachine.startReactively()
. It then processes the stream of log entries provided by the entries
Flux. For each log entry, the updateOccurrence
method is called, which updates the frequency and error counts in the respective hash maps. Following this, the detectAnomaly
method is invoked, which uses the state machine and the updated maps to identify any anomalies in the log data. Once all log entries have been processed, the state machine is stopped reactively using stateMachine.stopReactively()
. The result is a Flux of AnomalyResult
objects, representing the detected anomalies within the given log entries.
private Flux<AnomalyResult> processLogsForGivenWindow(Flux<LogEntry> entries) {
StateMachine<LogState, LogEvent> stateMachine = stateMachineFactory.getStateMachine();
ConcurrentHashMap<String, Integer> ipFrequencyMap = new ConcurrentHashMap<>();
ConcurrentHashMap<String, Integer> ipErrorMap = new ConcurrentHashMap<>();
return stateMachine.startReactively()
.thenMany(entries.doOnNext(entry -> updateOccurrence(entry, ipFrequencyMap, ipErrorMap))
.flatMap(entry -> detectAnomaly(entry, stateMachine, ipFrequencyMap, ipErrorMap)))
.doFinally(signalType -> stateMachine.stopReactively());
}
Step6: Expose as an API
The AnomalyDetectionController
is a Spring Boot REST controller that handles HTTP POST requests for detecting anomalies in log data. It relies on an injected AnomalyDetectionService
to perform the actual detection. When a POST request is made to the /detect-anomalies
endpoint with a Flux of log entries in the request body, the detectAnomalies
method is triggered. This method passes the received log entries to the AnomalyDetectionService
, which processes the data and returns a reactive stream (Flux) of AnomalyResult
objects, representing any detected anomalies.
@RestController
public class AnomalyDetectionController {
private final AnomalyDetectionService anomalyDetectionService;
public AnomalyDetectionController(AnomalyDetectionService anomalyDetectionService) {
this.anomalyDetectionService = anomalyDetectionService;
}
@PostMapping("/detect-anomalies")
public Flux<AnomalyResult> detectAnomalies(@RequestBody Flux<LogEntry> logEntries) {
return anomalyDetectionService.detectAnomalies(logEntries);
}
}
Step7: Testing
Given below the sample payload where the system detects high traffic results in FREQUENCY_ALERT
Given below the sample payload where the system detects suspicious keywords results in KEYWORD_ALERT
Performance Consideration:
Buffering:
Temporarily store incoming data in a buffer until the consumer is ready to process it. Be mindful of buffer sizes to avoid memory overload.
Dropping:
Drop the excess data that cannot be processed immediately. This approach is useful when missing some data is acceptable.
Latest & Greatest:
Keep only the most recent data and discard the older ones if the consumer is not able to keep up.
Error Handling:
Emit an error or signal to the producer that the consumer cannot handle more data
Conclusion:
Future enhancement include the integration of a notification system, which will significantly broaden the system's capabilities. This enhancement aims to facilitate the automatic dispatch of alerts to monitoring systems, ensuring that relevant personnel are informed without delay. Additionally, it will enable the triggering of automated responses to common issues, streamlining the process of issue resolution and reducing the need for manual intervention.