visit
Install the following Nuget package into the subsequent project “DotnetAspireChallenge.AppHost”
dotnet add package Aspire.Hosting.Kafka
var messaging = builder.AddKafka("messaging")
``` .WithKafkaUI();
Then finally add a reference to both the Producer and Consumer where the producer is “DotnetAspireChallenge.ApiService” and the consumer is “DotnetAspireChallenge.Web” project respectively.
```csharp
var apiService = builder.AddProject<Projects.DotnetAspireChallenge_ApiService>("apiservice")
.WithReference(messaging);
builder.AddProject<Projects.DotnetAspireChallenge_Web>("webfrontend")
.WithExternalHttpEndpoints()
.WithReference(cache)
.WithReference(apiService)
.WithReference(messaging);
builder.AddKafkaProducer<string, string>("messaging");
public static class AspireKafkaExtension
{
public static void MapAspireKafkaEndpoint(this WebApplication app)
{
app.MapGet("/send", async (IProducer<string, string> services, string key, string value) =>
{
try
{
var message = new Message<string, string> { Key = key, Value = value };
DeliveryResult<string, string>? result = await services.ProduceAsync("messaging", message);
return result;
}
catch (Exception ex)
{
throw;
}
});
}
}
//localhost:7313/send?key=key&value=1
builder.AddKafkaConsumer<string, string>("messaging", options =>
{
options.Config.GroupId = "my-consumer-group";
options.Config.AutoOffsetReset = AutoOffsetReset.Earliest;
options.Config.EnableAutoCommit = false;
});
@page "/kafka"
@attribute [StreamRendering(true)]
@attribute [OutputCache(Duration = 5)]
@using Confluent.Kafka
<h3>KafkaConsumer</h3>
@inject KafkaConsumeMessageClient kafaConsumeMessageClient
<PageTitle>Kafka Consumed Message</PageTitle>
<h1>Kafka</h1>
<p>This component demonstrates showing data loaded from a backend API service.</p>
@if (consumedMessage == null)
{
<p><em>Loading...</em></p>
}
else
{
<table class="table">
<thead>
<tr>
<th>Topic</th>
<th>Value</th>
</tr>
</thead>
<tbody>
<tr>
<td>@consumedMessage.Topic</td>
<td>@consumedMessage.Value</td>
</tr>
</tbody>
</table>
}
@code {
private ConsumeResult<string, string>? consumedMessage;
protected override async Task OnInitializedAsync() => consumedMessage = kafaConsumeMessageClient.GetKafkaMessage();
}
public class KafkaConsumeMessageClient(HttpClient httpClient, IConsumer<string, string> _consumer)
{
public ConsumeResult<string, string>? GetKafkaMessage(CancellationToken cancellationToken = default)
{
ConsumeResult<string, string>? deliveryResult = null;
_consumer.Subscribe("messaging");
deliveryResult = _consumer.Consume(TimeSpan.FromSeconds(10));
return deliveryResult;
}
}