visit
I was reading up on PostgreSQL when I noticed an interesting feature called logical replication. The documentation explains that logical replication is:
A method of replicating data objects and their changes, based upon their replication identity (usually a primary key)“.
With this library that I named DotNetify.Postgres (), your project won’t involve polling the database, setting up a complicated pub/subsystem, or take much code at all. The rest of the blog will take you through building a demo app.
To enable logical replication in your PostgreSQL database, find postgresql.conf
configuration file, change the parameter wal_level to logical, and both max_wal_senders and max_replication_slots to at least 1. Changes will take effect after the service restarts.
ALTER SYSTEM SET wal_level='logical';
ALTER SYSTEM SET max_wal_senders='10';
ALTER SYSTEM SET max_replication_slots='10';
CREATE PUBLICATION my_pub FOR ALL TABLES;
SELECT * FROM pg_create_logical_replication_slot('my_slot', 'pgoutput');
The pgoutput
is PostgreSQL’s standard logical decoding plugin for transforming the changes from WAL to the logical replication protocol.
CREATE TABLE IF NOT EXISTS businesses (
business_id serial PRIMARY KEY,
business_name VARCHAR ( 50 ) UNIQUE NOT NULL,
rating integer
);
CREATE USER my_user WITH PASSWORD 'my_pwd';
ALTER ROLE my_user WITH REPLICATION;
GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO my_user;
GRANT USAGE, SELECT ON ALL SEQUENCES IN SCHEMA public TO my_user;
Notice that we give the REPLICATION role to the user. Having this role is required for subscribing to replication slots.
[Table("businesses")]
public class Business
{
[Column("business_id")]
[Key]
public long Id { get; set; }
[Column("business_name")]
public string Name { get; set; }
[Column("rating")]
public int Rating { get; set; }
}
public class BusinessesVM : BaseVM
{
private IDisposable _subs;
// Real-time list; see: //dotnetify.net/core/api/crud.
[ItemKey(nameof(Business.Id))]
public List<Business> Businesses { get; set; }
public BusinessesVM(IDbChangeObserver dbChangeObserver)
{
Businesses = new List<Business>();
_subs = dbChangeObserver
.Observe<Business>()
.Subscribe(e =>
{
if (e is DbInsertEvent<Business>)
{
var row = (e as DbInsertEvent<Business>).Row;
this.AddList(nameof(Businesses), row);
}
else if (e is DbUpdateEvent<Business>)
{
var row = (e as DbUpdateEvent<Business>).NewRow;
this.UpdateList(nameof(Businesses), row);
}
else if (e is DbDeleteEvent<Business>)
{
var key = (e as DbDeleteEvent<Business>).Row.Id;
this.RemoveList(nameof(Businesses), key);
}
PushUpdates();
});
}
public override void Dispose() => _subs.Dispose();
}
public void ConfigureServices(IServiceCollection services)
{
services.AddSignalR();
services.AddDotNetify();
services.AddDotNetifyPostgres(new PostgresConfiguration
{
ConnectionString = Configuration.GetConnectionString("Postgres"),
PublicationName = "my_pub",
ReplicationSlotName = "my_slot"
});
}
And that’s all there is to it! Here’s how the app’s reacting to data changes:
public class BusinessDbContext : DbContext
{
public DbSet<Business> Businesses { get; set; }
public BusinessDbContext(DbContextOptions<BusinessDbContext> options)
: base(options) {}
}
DotNetify view models must use a factory to create a new DbContext because of the long lifetimes. So we configure the DbContextFactory service in the startup class:
public void ConfigureServices(IServiceCollection services)
{
...
services.AddDbContextFactory<BusinessDbContext>(options =>
options.UseNpgsql(Configuration.GetConnectionString("Postgres")));
}
The last step is implementing CRUD methods on the view model:
public class BusinessesVM : BaseVM
{
private readonly IDbContextFactory<BusinessDbContext> _contextFactory;
[ItemKey(nameof(Business.Id))]
public List<Business> Businesses { get; set; }
public BusinessesVM(
IDbContextFactory<BusinessDbContext> dbContextFactory,
IDbChangeObserver dbChangeObserver)
{
_contextFactory = dbContextFactory;
using var dbContext = _contextFactory.CreateDbContext();
Businesses = dbContext.Businesses.OrderBy(x => x.Id).ToList();
this.ObserveList<Business>(nameof(Businesses), dbChangeObserver);
}
public void Add(Business businessInfo)
{
using var dbContext = _contextFactory.CreateDbContext();
dbContext.Businesses.Add(businessInfo);
dbContext.SaveChanges();
}
public void Update(Business businessInfo)
{
using var dbContext = _contextFactory.CreateDbContext();
var business = dbContext.Businesses.Find(businessInfo.Id);
if (business != null)
{
business.Name = businessInfo.Name;
business.Rating = businessInfo.Rating;
dbContext.SaveChanges();
}
}
public void Remove(Business businessInfo)
{
using var dbContext = _contextFactory.CreateDbContext();
var business = dbContext.Businesses.Find(businessInfo.Id);
if (business != null)
{
dbContext.Businesses.Remove(business);
dbContext.SaveChanges();
}
}
}
As you may notice, the previous code that does the subscription and data change event handling are simplified into a single call to the ObserveList
extension method.
export const Businesses = () => {
const { vm, state } = useConnect<State>("BusinessesVM");
const [newName, setNewName] = useState<string>("");
const addBusiness = (name: string) => {
vm.$dispatch({ Add: new Business(0, name) });
setNewName("");
};
const updateBusiness = (id: number, name: string, rating: number) => {
vm.$dispatch({ Update: new Business(id, name, rating) });
};
const removeBusiness = (id: number) => {
vm.$dispatch({ Remove: new Business(id) });
};
...
And with that, here’s the demo of two browser instances of the app keeping in sync with each other while we apply some CRUD operations:
I hope this is useful to you! I’d be interested to hear what you think. and let me know. And feel free to retweet!