visit
interface EventLoop {
void start();
void stop();
}
interface Events {
Optional<Event> next();
}
interface Event {
void trigger(Script script);
}
interface Script {
void run(JsonObject properties, Consumer<Instant> onSuccess, Consumer<Throwable> onFailure);
}
The Script
interface deserves more attention. We can ask it to run
with given properties and with two callbacks. One for success and one for failure. The first one sends the script end time. The second one sends the exception that has occured.
final class BusyWaitingEventLoop implements EventLoop {
private final Events events;
private final Script script;
private final AtomicBoolean alive;
BusyWaitingEventLoop(final Events events, final Script script) {
this(events, script, new AtomicBoolean(true));
}
BusyWaitingEventLoop(final Events events, final Script script, final AtomicBoolean alive) {
this.events = events;
this.script = script;
this.alive = alive;
}
@Override
public void start() {
while (alive.get()) {
events.next().ifPresent(event -> event.trigger(script));
}
}
@Override
public void stop() {
alive.set(false);
}
}
The ExecutorService
responsibility is to run tasks. It can run them in different threads or in a single thread. Java provides both implementations thanks to a bunch of .
final class MultithreadedEventLoop implements EventLoop {
private final EventLoop origin;
private final Integer nThreads;
private final ExecutorService executorService;
MultithreadedEventLoop(final EventLoop origin, final Integer nThreads, final ExecutorService executorService) {
this.origin = origin;
this.nThreads = nThreads;
this.executorService = executorService;
}
@Override
public void start() {
for (var i = 0; i < nThreads; i++) {
executorService.execute(origin::start);
}
}
@Override
public void stop() {
origin.stop();
shutdownExecutorService();
}
private void shutdownExecutorService() {
// Java specific code
}
}
We can start a MultithreadedEventLoop
with eight threads in this way:
public final class Main {
public static void main(String[] args) {
var nThreads = 8;
new MultithreadedEventLoop(
new BusyWaitingEventLoop(
anEventsObject,
aScriptObject
),
nThreads,
Executors.newFixedThreadPool(nThreads)
);
}
}
Here is an implementation that exploits the ExecutorService
again:
final class AsyncScript implements Script {
private final Script origin;
private final ExecutorService executorService;
AsyncScript(final Script origin, final ExecutorService executorService) {
this.origin = origin;
this.executorService = executorService;
}
@Override
public void run(final JsonObject properties, final Consumer<Instant> onSuccess, final Consumer<Throwable> onFailure) {
if (!executorService.isShutdown()) {
executorService.execute(() -> origin.run(properties, onSuccess, onFailure));
}
}
}
AsyncScript
is a Script
decorator. Using this, we can turn a synchronous Script
to an asynchronous one.
Indeed, we can easily integrate AsyncScript
in the previous example:
public final class Main {
public static void main(String[] args) {
var eventLoopThreads = 2;
var scriptThreads = 14;
var executorService = Executors.newFixedThreadPool(eventLoopThreads + scriptThreads);
new MultithreadedEventLoop(
new BusyWaitingEventLoop(
anEventsObject,
new AsyncScript(
aScriptObject,
executorService
)
),
eventLoopThreads,
executorService
).start();
}
}
Now we have an ExecutorService
with sixteen threads. Two threads were assigned to the event loop, with fourteen available to the scripts. So the event loop will receive new events in two threads. While the scripts will run in others fourteen threads.
interface EventLoop {
void start();
void stop();
}
interface Events<I, O> {
Optional<Event<I, O>> next();
}
interface Event<I, O> {
void trigger(Script<I, O> script);
}
interface Script<I, O> {
void run(I in, Consumer<O> onSuccess, Consumer<Throwable> onFailure);
}
In this way we can handle generic input/output in the Script
.
While the AggregateEvents
aggregates Events
objects, in my case, it aggregates multiple Events
that emit AzureQueueMessage
. The latter triggers the scripts with the message body as properties. And on completion they remove themselves from the queue.
We can improve the presented implementation in various ways. For example, we can add more capabilities to the event loop, such as restarting. But, as always, keep in mind the modeled domain. Furthermore there could be more opportunities to catch. For example I can guess a prioritised Events
implementation, or I can guess event loops with different ExecutorService
implementations. In this way, we could have finer-grained control over threads or priorities.