visit
The locking mechanism is used to manage access to resources shared by user databases, tables, pages, and rows to guarantee the consistency of the transmitted data. There are two types of locking in databases, Optimistic Locking, and Pessimistic locking.
Optimistic locking is a technique for SQL database applications that do not hold row locks between selecting and updating or deleting a row. If the row does change, the update or delete will fail, and the application logic handles such failures by, for example, retrying the whole process.⁽¹⁾
The pessimistic locking model prevents simultaneous updates to records. As soon as one user starts to update a record, a lock is placed on it. Other users who attempt to update this record are informed that another user has an update in progress. The other users must wait until the first user has finished committing their changes, releasing the record lock. Only then can another user make changes based on the previous user's changes.⁽²⁾
For this implementation, we will create a use case with two repositories implementation, one using pessimistic and the other optimistic approaches, to simulate deposit action in the account table. For the sake of learning, we use 2 steps in this implementation, firstly we will select the account
record, modify in GO, then update to the PostgreSQL.
BEGIN;
SELECT * FROM account WHERE username = $1 LIMIT 1 FOR NO KEY UPDATE;
UPDATE account SET balance = $1, version = version+1 WHERE id = $2;
COMMIT;
FOR NO KEY UPDATE
will tell the database to lock the row for updates that only modify non-key fields.
const (
selectAccountLock = "SELECT * FROM account WHERE username = $1 LIMIT 1 FOR NO KEY UPDATE;"
depositAccount = "UPDATE account SET balance = $1, version = version+1 WHERE id = $2;"
)
func (a *accountRepository) Deposit(ctx context.Context, request *entity.DepositRequest) error {
// declare the transaction
tx, err := a.db.Beginx()
if err != nil {
fmt.Println(err)
return entity.ErrInternalServer
}
// select account from database
rows, err := tx.QueryxContext(ctx, selectAccountLock, request.Username)
if err != nil {
return entity.ErrInternalServer
}
var account entity.Account
for rows.Next() {
err = rows.StructScan(&account)
if err != nil {
return entity.ErrInternalServer
}
}
account.Balance += request.Amount
_, err = tx.ExecContext(ctx, depositAccount, account.Balance, account.ID)
if err != nil {
tx.Rollback()
fmt.Println(err)
return entity.ErrInternalServer
}
tx.Commit()
return nil
}
SELECT * FROM account WHERE username = $1 LIMIT 1;
UPDATE account SET balance = $1, version = version + 1 WHERE id = $2 AND version = $3;
If we look at the update query there is a version = $3
on the where clause, it is used to check if there is any other modification on the row between our select
and update
queries. If the update
query return zero rows affected
it means the row has been modified between our select
and update
queries, then we have to retry those two queries.
const (
maxRetries = 3
selectAccountByUsername = "SELECT * FROM account WHERE username = $1 LIMIT 1;"
depositAccountOpt = "UPDATE account SET balance = $1, version = version+1 " +
"WHERE id = $2 AND version = $3;"
)
func (a *accountRepositoryOpt) Deposit(ctx context.Context, request *entity.DepositRequest) error {
return a.depositOpt(ctx, request.Username, request.Amount, 0)
}
func (a *accountRepositoryOpt) depositOpt(ctx context.Context, username string, amount float64, retry int) error {
// check retry count
if retry >= maxRetries {
return entity.ErrMaxRetry
}
// select account
rows, err := a.db.QueryxContext(ctx, selectAccountByUsername, username)
if err != nil {
return entity.ErrAccountByUsernameNotFound(username)
}
var account entity.Account
for rows.Next() {
err = rows.StructScan(&account)
if err != nil {
return entity.ErrInternalServer
}
}
// add account balance with the depositOpt amount
account.Balance += amount
// update new account to db
result, err := a.db.ExecContext(ctx, depositAccountOpt, account.Balance, account.ID, account.Version)
if err != nil {
fmt.Println(err)
return entity.ErrInternalServer
}
// check rows affected count
rowsAffected, err := result.RowsAffected()
if err != nil {
fmt.Println(err)
return entity.ErrInternalServer
}
// retry if count rows affected by update less than one
if rowsAffected == 0 {
return a.depositOpt(ctx, username, amount, retry+1) // recursion
}
return nil
}
We use the recursion function to call retry if the rows affected are zero, and we will terminate recursion when retry
is equal to the maximum number of retries (maxRetries
).
func Test_accountUseCase_Deposit(t *testing.T) {
type fields struct {
accountRepo accountRepo
}
ctx := context.Background()
tests := []struct {
name string
fields fields
wantErr bool
}{
{
name: "pessimistic",
fields: fields{
accountRepo: repository.NewAccountRepository(db),
},
wantErr: false,
},
{
name: "optimistic",
fields: fields{
accountRepo: repository.NewAccountRepositoryOpt(db),
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
a := &accountUseCase{
accountRepo: tt.fields.accountRepo,
}
start := time.Now()
if err := testSaveBulkData(a, ctx); (err != nil) != tt.wantErr {
t.Errorf("Deposit() error = %v, wantErr %v", err, tt.wantErr)
}
fmt.Println("Execution time", time.Since(start))
})
}
}
func testSaveBulkData(a *accountUseCase, ctx context.Context) error {
sem := make(chan struct{}, 100)
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go save(a, ctx, strconv.Itoa((i%100)+1), sem, &wg)
}
wg.Wait()
return nil
}
func save(a *accountUseCase, ctx context.Context, username string, sem chan struct{}, wg *sync.WaitGroup) {
defer wg.Done()
sem <- struct{}{}
err := a.Deposit(ctx, &entity.DepositRequest{Username: username, Amount: 1000})
if err != nil {
fmt.Println("Error", err)
}
<-sem
}
Optimistic | Pessimistic |
---|---|
|
|
|
|
Optimistic | Pessimistic |
---|---|
|
|
|
|
Optimistic | Pessimistic |
---|---|
|
|
|
|