Forked from
Eclipse Projects / xfsc / Organisational Credential Manager - W-Stack / Status List Service
40 commits behind the upstream repository.
-
Manuel Heß authoredManuel Heß authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
postgres.go 6.90 KiB
package database
import (
"context"
"errors"
"fmt"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
entity2 "gitlab.eclipse.org/eclipse/xfsc/organisational-credential-manager-w-stack/status-list-service/internal/entity"
"regexp"
)
// TODO: queries as constants?
type postgresConnection struct {
conn *pgxpool.Pool
blockSizeInBytes int
}
func newPostgresConnection(username string, password string, host string, port int, db string, blockSizeInBytes int) (DbConnection, error) {
connUrl := fmt.Sprintf("postgres://%s:%s@%s:%d/%s", username, password, host, port, db)
conn, err := pgxpool.New(context.Background(), connUrl)
if err != nil {
return nil, err
}
if err = conn.Ping(context.Background()); err != nil {
return nil, fmt.Errorf("connection check failed: %w", err)
}
return &postgresConnection{
conn: conn,
blockSizeInBytes: blockSizeInBytes,
}, nil
}
func (pc *postgresConnection) AllocateIndexInCurrentBlock(ctx context.Context, tenantId string) (*entity2.StatusData, error) {
tx, err := pc.conn.BeginTx(ctx, pgx.TxOptions{
IsoLevel: pgx.ReadCommitted,
AccessMode: pgx.ReadWrite,
DeferrableMode: pgx.NotDeferrable,
})
if err != nil {
return nil, fmt.Errorf("could not start transaction: %w", err)
}
defer tx.Rollback(ctx)
if err != nil {
return nil, fmt.Errorf("error creating transaction: %w", err)
}
tableName, err := createTableName(tenantId)
if err != nil {
return nil, err
}
selectQuery := fmt.Sprintf("SELECT blockID, block, free FROM %s WHERE free > 0 FOR UPDATE LIMIT 1", tableName)
rows, err := tx.Query(ctx, selectQuery)
if err != nil {
return nil, fmt.Errorf("error while select current block from the database: %w", err)
}
// not optimized for performance cause of reflection
databaseRows, err := pgx.CollectRows(rows, pgx.RowToStructByName[entity2.Block])
if err != nil {
return nil, fmt.Errorf("error while collecting current block from rows: %w", err)
}
if len(databaseRows) == 0 {
// no current block -> create new one and allocate index
newBlock := entity2.NewBlock(pc.blockSizeInBytes)
index, err := newBlock.AllocateNextFreeIndex()
if err != nil {
return nil, fmt.Errorf("error allocating next free index from new block: %w", err)
}
insertQuery := fmt.Sprintf("INSERT INTO %s (block, free) VALUES ($1, $2) RETURNING blockID", tableName)
var blockId int
if err = tx.
QueryRow(ctx, insertQuery, newBlock.Block, newBlock.Free).
Scan(&blockId); err != nil {
return nil, fmt.Errorf("error inserting new block into the database: %w", err)
}
if err := tx.Commit(ctx); err != nil {
return nil, fmt.Errorf("error commiting transaction: %w", err)
}
return entity2.NewStatusData(index, blockId), nil
}
// allocate index in current block
currentBlock := databaseRows[0]
index, err := currentBlock.AllocateNextFreeIndex()
if err != nil {
return nil, fmt.Errorf("error allocating next free index from current block: %w", err)
}
updateQuery := fmt.Sprintf("UPDATE %s%s SET block = $1, free = $2 WHERE blockID = $3", TablePrefix, tenantId)
if _, err := tx.Exec(ctx, updateQuery, currentBlock.Block, currentBlock.Free, currentBlock.BlockId); err != nil {
return nil, fmt.Errorf("error updating block in the database: %w", err)
}
if err := tx.Commit(ctx); err != nil {
return nil, fmt.Errorf("error commiting transaction: %w", err)
}
return entity2.NewStatusData(index, currentBlock.BlockId), nil
}
func (pc *postgresConnection) RevokeCredentialInSpecifiedBlock(ctx context.Context, tenantId string, blockId int, index int) error {
tx, err := pc.conn.BeginTx(ctx, pgx.TxOptions{
IsoLevel: pgx.ReadCommitted,
AccessMode: pgx.ReadWrite,
DeferrableMode: pgx.NotDeferrable,
})
if err != nil {
return fmt.Errorf("could not start transaction: %w", err)
}
defer tx.Rollback(ctx)
tableName, err := createTableName(tenantId)
if err != nil {
return err
}
fmt.Println(tableName)
selectQuery := fmt.Sprintf("SELECT blockID, block, free FROM %s WHERE blockID = $1 FOR UPDATE LIMIT 1", tableName)
rows, err := tx.Query(ctx, selectQuery, blockId)
if err != nil {
return fmt.Errorf("error while select specified block from the database: %w", err)
}
databaseRows, err := pgx.CollectRows(rows, pgx.RowToStructByName[entity2.Block])
if err != nil {
return fmt.Errorf("error while getting specified block from rows: %w", err)
}
if len(databaseRows) == 0 {
return fmt.Errorf("blockId %d does not exist in database", blockId)
}
specifiedBlock := databaseRows[0]
specifiedBlock.RevokeAtIndex(index)
updateQuery := fmt.Sprintf("UPDATE %s%s SET block = $1 WHERE blockID = $2", TablePrefix, tenantId)
_, err = tx.Exec(ctx, updateQuery, specifiedBlock.Block, specifiedBlock.BlockId)
if err != nil {
return fmt.Errorf("error updating block in the database: %w", err)
}
err = tx.Commit(ctx)
if err != nil {
return fmt.Errorf("error commiting transaction: %w", err)
}
return nil
}
func (pc *postgresConnection) CreateTableForTenantIdIfNotExists(ctx context.Context, tenantId string) error {
tx, err := pc.conn.BeginTx(ctx, pgx.TxOptions{
IsoLevel: pgx.ReadCommitted,
AccessMode: pgx.ReadWrite,
DeferrableMode: pgx.NotDeferrable,
})
if err != nil {
return fmt.Errorf("could not start transaction: %w", err)
}
defer tx.Rollback(ctx)
var n int64
exists := true
_, err = tx.Exec(ctx, "LOCK TABLE information_schema.tables IN EXCLUSIVE MODE")
if err != nil {
return fmt.Errorf("could not lock table: %w", err)
}
tableName, err := createTableName(tenantId)
if err != nil {
return err
}
const tableExistQuery = "SELECT 1 FROM information_schema.tables WHERE table_name = $1"
if err = tx.QueryRow(ctx, tableExistQuery, tableName).Scan(&n); err != nil {
if errors.Is(err, pgx.ErrNoRows) {
exists = false
} else {
return fmt.Errorf("error query for table name: %w", err)
}
}
if !exists {
createTableQuery := fmt.Sprintf("CREATE TABLE %s (blockID SERIAL PRIMARY KEY, block BYTEA, free INT)", tableName)
_, err = tx.Exec(ctx, createTableQuery)
if err != nil {
return fmt.Errorf("could not create new table for tenantID: %w", err)
}
newBlock := entity2.NewBlock(pc.blockSizeInBytes)
insertQuery := fmt.Sprintf("INSERT INTO %s (block, free) VALUES ($1, $2)", tableName)
_, err = tx.Exec(ctx, insertQuery, newBlock.Block, newBlock.Free)
if err != nil {
return fmt.Errorf("error inserting new block into the database: %w", err)
}
}
err = tx.Commit(ctx)
if err != nil {
return fmt.Errorf("error commiting transaction: %w", err)
}
return nil
}
func (pc *postgresConnection) Close() {
pc.conn.Close()
}
func createTableName(tenantId string) (string, error) {
tableName := TablePrefix + tenantId
isValid, err := regexp.Match("^[a-zA-Z0-9_]+$", []byte(tableName))
if err != nil {
return "", fmt.Errorf("error while checking tableName: %w", err)
}
if !isValid {
return "", fmt.Errorf("tableName '%s' is not valid", tableName)
}
return tableName, nil
}