Skip to content

Commit

Permalink
Merge pull request #25 from ryogrid/concurrent-txn-exec-using-frontend
Browse files Browse the repository at this point in the history
Concurrent txn execution using frontend
  • Loading branch information
ryogrid authored Oct 10, 2023
2 parents e1cbb40 + 04676f7 commit d09905e
Show file tree
Hide file tree
Showing 14 changed files with 522 additions and 310 deletions.
26 changes: 19 additions & 7 deletions catalog/table_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package catalog

import (
"strings"
"sync"

"github.com/ryogrid/SamehadaDB/storage/index"
"github.com/ryogrid/SamehadaDB/storage/index/index_constants"
Expand Down Expand Up @@ -36,10 +37,12 @@ type Catalog struct {
tableIds map[uint32]*TableMetadata
tableNames map[string]*TableMetadata
// incrementation must be atomic
nextTableId uint32
tableHeap *access.TableHeap
Log_manager *recovery.LogManager
Lock_manager *access.LockManager
nextTableId uint32
tableHeap *access.TableHeap
Log_manager *recovery.LogManager
Lock_manager *access.LockManager
tableIdsMutex *sync.Mutex
tableNamesMutex *sync.Mutex
}

func Int32toBool(val int32) bool {
Expand All @@ -53,7 +56,7 @@ func Int32toBool(val int32) bool {
// BootstrapCatalog bootstrap the systems' catalogs on the first database initialization
func BootstrapCatalog(bpm *buffer.BufferPoolManager, log_manager *recovery.LogManager, lock_manager *access.LockManager, txn *access.Transaction) *Catalog {
tableCatalogHeap := access.NewTableHeap(bpm, log_manager, lock_manager, txn)
tableCatalog := &Catalog{bpm, make(map[uint32]*TableMetadata), make(map[string]*TableMetadata), 0, tableCatalogHeap, log_manager, lock_manager}
tableCatalog := &Catalog{bpm, make(map[uint32]*TableMetadata), make(map[string]*TableMetadata), 0, tableCatalogHeap, log_manager, lock_manager, new(sync.Mutex), new(sync.Mutex)}
tableCatalog.CreateTable("columns_catalog", ColumnsCatalogSchema(), txn)
return tableCatalog
}
Expand Down Expand Up @@ -107,20 +110,23 @@ func RecoveryCatalogFromCatalogPage(bpm *buffer.BufferPoolManager, log_manager *
tableNames[name] = tableMetadata
}

return &Catalog{bpm, tableIds, tableNames, 1, access.InitTableHeap(bpm, 0, log_manager, lock_manager), log_manager, lock_manager}

return &Catalog{bpm, tableIds, tableNames, 1, access.InitTableHeap(bpm, 0, log_manager, lock_manager), log_manager, lock_manager, new(sync.Mutex), new(sync.Mutex)}
}

func (c *Catalog) GetTableByName(table string) *TableMetadata {
// note: alphabets on table name is stored in lowercase
tableName := strings.ToLower(table)
c.tableNamesMutex.Lock()
defer c.tableNamesMutex.Unlock()
if table_, ok := c.tableNames[tableName]; ok {
return table_
}
return nil
}

func (c *Catalog) GetTableByOID(oid uint32) *TableMetadata {
c.tableIdsMutex.Lock()
defer c.tableIdsMutex.Unlock()
if table, ok := c.tableIds[oid]; ok {
return table
}
Expand All @@ -129,6 +135,8 @@ func (c *Catalog) GetTableByOID(oid uint32) *TableMetadata {

func (c *Catalog) GetAllTables() []*TableMetadata {
ret := make([]*TableMetadata, 0)
c.tableIdsMutex.Lock()
defer c.tableIdsMutex.Unlock()
for key, _ := range c.tableIds {
ret = append(ret, c.tableIds[key])
}
Expand Down Expand Up @@ -163,8 +171,12 @@ func (c *Catalog) CreateTable(name string, schema_ *schema.Schema, txn *access.T

tableMetadata := NewTableMetadata(schema_, name_, tableHeap, oid)

c.tableIdsMutex.Lock()
c.tableIds[oid] = tableMetadata
c.tableIdsMutex.Unlock()
c.tableNamesMutex.Lock()
c.tableNames[name_] = tableMetadata
c.tableNamesMutex.Unlock()
c.insertTable(tableMetadata, txn)

return tableMetadata
Expand Down
2 changes: 1 addition & 1 deletion common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ var LogTimeout time.Duration
const EnableDebug bool = false //true

// use virtual storage or not
const EnableOnMemStorage = true //false
const EnableOnMemStorage = true

// when this is true, virtual storage use is suppressed
// for test case which can't work with virtual storage
Expand Down
20 changes: 16 additions & 4 deletions execution/executors/executor_test/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,8 @@ func TestDeleteWithSelctInsert(t *testing.T) {
bpm := buffer.NewBufferPoolManager(uint32(32), diskManager, log_mgr)
txn_mgr := access.NewTransactionManager(access.NewLockManager(access.REGULAR, access.DETECTION), log_mgr)
txn := txn_mgr.Begin(nil)
// TODO: (SDB) this is a hack to get around the fact that we don't have a recovery manager
txn.SetIsRecoveryPhase(true)

c := catalog.BootstrapCatalog(bpm, log_mgr, access.NewLockManager(access.REGULAR, access.PREVENTION), txn)

Expand Down Expand Up @@ -1112,6 +1114,8 @@ func TestAbortWIthDeleteUpdate(t *testing.T) {
bpm := buffer.NewBufferPoolManager(uint32(32), diskManager, log_mgr)
txn_mgr := access.NewTransactionManager(access.NewLockManager(access.REGULAR, access.DETECTION), log_mgr)
txn := txn_mgr.Begin(nil)
// TODO: (SDB) for avoiding crash
txn.SetIsRecoveryPhase(true)

c := catalog.BootstrapCatalog(bpm, log_mgr, access.NewLockManager(access.REGULAR, access.PREVENTION), txn)

Expand Down Expand Up @@ -1148,6 +1152,8 @@ func TestAbortWIthDeleteUpdate(t *testing.T) {

fmt.Println("update and delete rows...")
txn = txn_mgr.Begin(nil)
// TODO: (SDB) for avoiding crash...
txn.SetIsRecoveryPhase(true)
executorContext.SetTransaction(txn)

// update
Expand Down Expand Up @@ -1217,6 +1223,8 @@ func TestAbortWIthDeleteUpdate(t *testing.T) {
fmt.Println("select and check value after Abort...")

txn = txn_mgr.Begin(nil)
// TODO: (SDB) for avoiding crash...
txn.SetIsRecoveryPhase(true)
executorContext.SetTransaction(txn)

// check updated row
Expand Down Expand Up @@ -2379,11 +2387,13 @@ func TestInsertAndSpecifiedColumnUpdatePageMoveRecovery(t *testing.T) {

// disable logging
log_mgr.DeactivateLogging()
txn.SetIsRecoveryPhase(true)

// do recovery from Log
log_recovery.Redo()
log_recovery.Undo()
log_recovery.Redo(txn)
log_recovery.Undo(txn)

txn.SetIsRecoveryPhase(false)
// reactivate logging
log_mgr.ActivateLogging()

Expand Down Expand Up @@ -2552,11 +2562,13 @@ func TestInsertAndSpecifiedColumnUpdatePageMoveOccurOnRecovery(t *testing.T) {

// disable logging
log_mgr.DeactivateLogging()
txn.SetIsRecoveryPhase(true)

// do recovery from Log
log_recovery.Redo()
log_recovery.Undo()
log_recovery.Redo(txn)
log_recovery.Undo(txn)

txn.SetIsRecoveryPhase(false)
// reactivate logging
log_mgr.ActivateLogging()

Expand Down
Loading

0 comments on commit d09905e

Please sign in to comment.