Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrent txn execution using frontend #25

Merged
merged 9 commits into from
Oct 10, 2023
26 changes: 19 additions & 7 deletions catalog/table_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package catalog

import (
"strings"
"sync"

"github.com/ryogrid/SamehadaDB/storage/index"
"github.com/ryogrid/SamehadaDB/storage/index/index_constants"
Expand Down Expand Up @@ -36,10 +37,12 @@ type Catalog struct {
tableIds map[uint32]*TableMetadata
tableNames map[string]*TableMetadata
// incrementation must be atomic
nextTableId uint32
tableHeap *access.TableHeap
Log_manager *recovery.LogManager
Lock_manager *access.LockManager
nextTableId uint32
tableHeap *access.TableHeap
Log_manager *recovery.LogManager
Lock_manager *access.LockManager
tableIdsMutex *sync.Mutex
tableNamesMutex *sync.Mutex
}

func Int32toBool(val int32) bool {
Expand All @@ -53,7 +56,7 @@ func Int32toBool(val int32) bool {
// BootstrapCatalog bootstrap the systems' catalogs on the first database initialization
func BootstrapCatalog(bpm *buffer.BufferPoolManager, log_manager *recovery.LogManager, lock_manager *access.LockManager, txn *access.Transaction) *Catalog {
tableCatalogHeap := access.NewTableHeap(bpm, log_manager, lock_manager, txn)
tableCatalog := &Catalog{bpm, make(map[uint32]*TableMetadata), make(map[string]*TableMetadata), 0, tableCatalogHeap, log_manager, lock_manager}
tableCatalog := &Catalog{bpm, make(map[uint32]*TableMetadata), make(map[string]*TableMetadata), 0, tableCatalogHeap, log_manager, lock_manager, new(sync.Mutex), new(sync.Mutex)}
tableCatalog.CreateTable("columns_catalog", ColumnsCatalogSchema(), txn)
return tableCatalog
}
Expand Down Expand Up @@ -107,20 +110,23 @@ func RecoveryCatalogFromCatalogPage(bpm *buffer.BufferPoolManager, log_manager *
tableNames[name] = tableMetadata
}

return &Catalog{bpm, tableIds, tableNames, 1, access.InitTableHeap(bpm, 0, log_manager, lock_manager), log_manager, lock_manager}

return &Catalog{bpm, tableIds, tableNames, 1, access.InitTableHeap(bpm, 0, log_manager, lock_manager), log_manager, lock_manager, new(sync.Mutex), new(sync.Mutex)}
}

func (c *Catalog) GetTableByName(table string) *TableMetadata {
// note: alphabets on table name is stored in lowercase
tableName := strings.ToLower(table)
c.tableNamesMutex.Lock()
defer c.tableNamesMutex.Unlock()
if table_, ok := c.tableNames[tableName]; ok {
return table_
}
return nil
}

func (c *Catalog) GetTableByOID(oid uint32) *TableMetadata {
c.tableIdsMutex.Lock()
defer c.tableIdsMutex.Unlock()
if table, ok := c.tableIds[oid]; ok {
return table
}
Expand All @@ -129,6 +135,8 @@ func (c *Catalog) GetTableByOID(oid uint32) *TableMetadata {

func (c *Catalog) GetAllTables() []*TableMetadata {
ret := make([]*TableMetadata, 0)
c.tableIdsMutex.Lock()
defer c.tableIdsMutex.Unlock()
for key, _ := range c.tableIds {
ret = append(ret, c.tableIds[key])
}
Expand Down Expand Up @@ -163,8 +171,12 @@ func (c *Catalog) CreateTable(name string, schema_ *schema.Schema, txn *access.T

tableMetadata := NewTableMetadata(schema_, name_, tableHeap, oid)

c.tableIdsMutex.Lock()
c.tableIds[oid] = tableMetadata
c.tableIdsMutex.Unlock()
c.tableNamesMutex.Lock()
c.tableNames[name_] = tableMetadata
c.tableNamesMutex.Unlock()
c.insertTable(tableMetadata, txn)

return tableMetadata
Expand Down
2 changes: 1 addition & 1 deletion common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ var LogTimeout time.Duration
const EnableDebug bool = false //true

// use virtual storage or not
const EnableOnMemStorage = true //false
const EnableOnMemStorage = true

// when this is true, virtual storage use is suppressed
// for test case which can't work with virtual storage
Expand Down
20 changes: 16 additions & 4 deletions execution/executors/executor_test/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,8 @@ func TestDeleteWithSelctInsert(t *testing.T) {
bpm := buffer.NewBufferPoolManager(uint32(32), diskManager, log_mgr)
txn_mgr := access.NewTransactionManager(access.NewLockManager(access.REGULAR, access.DETECTION), log_mgr)
txn := txn_mgr.Begin(nil)
// TODO: (SDB) this is a hack to get around the fact that we don't have a recovery manager
txn.SetIsRecoveryPhase(true)

c := catalog.BootstrapCatalog(bpm, log_mgr, access.NewLockManager(access.REGULAR, access.PREVENTION), txn)

Expand Down Expand Up @@ -1112,6 +1114,8 @@ func TestAbortWIthDeleteUpdate(t *testing.T) {
bpm := buffer.NewBufferPoolManager(uint32(32), diskManager, log_mgr)
txn_mgr := access.NewTransactionManager(access.NewLockManager(access.REGULAR, access.DETECTION), log_mgr)
txn := txn_mgr.Begin(nil)
// TODO: (SDB) for avoiding crash
txn.SetIsRecoveryPhase(true)

c := catalog.BootstrapCatalog(bpm, log_mgr, access.NewLockManager(access.REGULAR, access.PREVENTION), txn)

Expand Down Expand Up @@ -1148,6 +1152,8 @@ func TestAbortWIthDeleteUpdate(t *testing.T) {

fmt.Println("update and delete rows...")
txn = txn_mgr.Begin(nil)
// TODO: (SDB) for avoiding crash...
txn.SetIsRecoveryPhase(true)
executorContext.SetTransaction(txn)

// update
Expand Down Expand Up @@ -1217,6 +1223,8 @@ func TestAbortWIthDeleteUpdate(t *testing.T) {
fmt.Println("select and check value after Abort...")

txn = txn_mgr.Begin(nil)
// TODO: (SDB) for avoiding crash...
txn.SetIsRecoveryPhase(true)
executorContext.SetTransaction(txn)

// check updated row
Expand Down Expand Up @@ -2379,11 +2387,13 @@ func TestInsertAndSpecifiedColumnUpdatePageMoveRecovery(t *testing.T) {

// disable logging
log_mgr.DeactivateLogging()
txn.SetIsRecoveryPhase(true)

// do recovery from Log
log_recovery.Redo()
log_recovery.Undo()
log_recovery.Redo(txn)
log_recovery.Undo(txn)

txn.SetIsRecoveryPhase(false)
// reactivate logging
log_mgr.ActivateLogging()

Expand Down Expand Up @@ -2552,11 +2562,13 @@ func TestInsertAndSpecifiedColumnUpdatePageMoveOccurOnRecovery(t *testing.T) {

// disable logging
log_mgr.DeactivateLogging()
txn.SetIsRecoveryPhase(true)

// do recovery from Log
log_recovery.Redo()
log_recovery.Undo()
log_recovery.Redo(txn)
log_recovery.Undo(txn)

txn.SetIsRecoveryPhase(false)
// reactivate logging
log_mgr.ActivateLogging()

Expand Down
Loading
Loading