Skip to content

Commit

Permalink
separatig logging and locking enabling control: WIP.
Browse files Browse the repository at this point in the history
  • Loading branch information
ryogrid committed Oct 10, 2023
1 parent 5201df5 commit 8e75e75
Show file tree
Hide file tree
Showing 11 changed files with 133 additions and 61 deletions.
20 changes: 16 additions & 4 deletions execution/executors/executor_test/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,8 @@ func TestDeleteWithSelctInsert(t *testing.T) {
bpm := buffer.NewBufferPoolManager(uint32(32), diskManager, log_mgr)
txn_mgr := access.NewTransactionManager(access.NewLockManager(access.REGULAR, access.DETECTION), log_mgr)
txn := txn_mgr.Begin(nil)
// TODO: (SDB) this is a hack to get around the fact that we don't have a recovery manager
txn.SetIsRecoveryPhase(true)

c := catalog.BootstrapCatalog(bpm, log_mgr, access.NewLockManager(access.REGULAR, access.PREVENTION), txn)

Expand Down Expand Up @@ -1112,6 +1114,8 @@ func TestAbortWIthDeleteUpdate(t *testing.T) {
bpm := buffer.NewBufferPoolManager(uint32(32), diskManager, log_mgr)
txn_mgr := access.NewTransactionManager(access.NewLockManager(access.REGULAR, access.DETECTION), log_mgr)
txn := txn_mgr.Begin(nil)
// TODO: (SDB) for avoiding crash
txn.SetIsRecoveryPhase(true)

c := catalog.BootstrapCatalog(bpm, log_mgr, access.NewLockManager(access.REGULAR, access.PREVENTION), txn)

Expand Down Expand Up @@ -1148,6 +1152,8 @@ func TestAbortWIthDeleteUpdate(t *testing.T) {

fmt.Println("update and delete rows...")
txn = txn_mgr.Begin(nil)
// TODO: (SDB) for avoiding crash...
txn.SetIsRecoveryPhase(true)
executorContext.SetTransaction(txn)

// update
Expand Down Expand Up @@ -1217,6 +1223,8 @@ func TestAbortWIthDeleteUpdate(t *testing.T) {
fmt.Println("select and check value after Abort...")

txn = txn_mgr.Begin(nil)
// TODO: (SDB) for avoiding crash...
txn.SetIsRecoveryPhase(true)
executorContext.SetTransaction(txn)

// check updated row
Expand Down Expand Up @@ -2379,11 +2387,13 @@ func TestInsertAndSpecifiedColumnUpdatePageMoveRecovery(t *testing.T) {

// disable logging
log_mgr.DeactivateLogging()
txn.SetIsRecoveryPhase(true)

// do recovery from Log
log_recovery.Redo()
log_recovery.Undo()
log_recovery.Redo(txn)
log_recovery.Undo(txn)

txn.SetIsRecoveryPhase(false)
// reactivate logging
log_mgr.ActivateLogging()

Expand Down Expand Up @@ -2552,11 +2562,13 @@ func TestInsertAndSpecifiedColumnUpdatePageMoveOccurOnRecovery(t *testing.T) {

// disable logging
log_mgr.DeactivateLogging()
txn.SetIsRecoveryPhase(true)

// do recovery from Log
log_recovery.Redo()
log_recovery.Undo()
log_recovery.Redo(txn)
log_recovery.Undo(txn)

txn.SetIsRecoveryPhase(false)
// reactivate logging
log_mgr.ActivateLogging()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,10 @@ func TestAbortWthDeleteUpdateUniqSkipListIndexCasePointScan(t *testing.T) {
executionEngine.Execute(deletePlanNode, executorContext)

log_mgr.DeactivateLogging()

// TODO: (SDB) for avoiding crash...
txn = txn_mgr.Begin(nil)
txn.SetIsRecoveryPhase(true)
executorContext.SetTransaction(txn)
fmt.Println("select and check value before Abort...")

// check updated row
Expand Down
33 changes: 17 additions & 16 deletions recovery/log_recovery/log_recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,10 @@ func (log_recovery *LogRecovery) DeserializeLogRecord(data []byte, log_record *r
* first return value: greatest LSN of log entries
* seconde return value: when redo operation occured, value is true
*/
func (log_recovery *LogRecovery) Redo() (types.LSN, bool) {
func (log_recovery *LogRecovery) Redo(txn *access.Transaction) (types.LSN, bool) {
// readLogLoopCnt := 0
// deserializeLoopCnt := 0

greatestLSN := 0
log_recovery.log_buffer = make([]byte, common.LogBufferSize)
var file_offset uint32 = 0
Expand Down Expand Up @@ -142,7 +143,7 @@ func (log_recovery *LogRecovery) Redo() (types.LSN, bool) {
access.CastPageAsTablePage(log_recovery.buffer_pool_manager.FetchPage(log_record.Insert_rid.GetPageId()))
if page_.GetLSN() < log_record.GetLSN() {
log_record.Insert_tuple.SetRID(&log_record.Insert_rid)
page_.InsertTuple(&log_record.Insert_tuple, log_recovery.log_manager, nil, nil)
page_.InsertTuple(&log_record.Insert_tuple, log_recovery.log_manager, nil, txn)
page_.SetLSN(log_record.GetLSN())
isRedoOccured = true
}
Expand All @@ -151,7 +152,7 @@ func (log_recovery *LogRecovery) Redo() (types.LSN, bool) {
page_ :=
access.CastPageAsTablePage(log_recovery.buffer_pool_manager.FetchPage(log_record.Delete_rid.GetPageId()))
if page_.GetLSN() < log_record.GetLSN() {
page_.ApplyDelete(&log_record.Delete_rid, nil, log_recovery.log_manager)
page_.ApplyDelete(&log_record.Delete_rid, txn, log_recovery.log_manager)
page_.SetLSN(log_record.GetLSN())
isRedoOccured = true
}
Expand All @@ -160,7 +161,7 @@ func (log_recovery *LogRecovery) Redo() (types.LSN, bool) {
page_ :=
access.CastPageAsTablePage(log_recovery.buffer_pool_manager.FetchPage(log_record.Delete_rid.GetPageId()))
if page_.GetLSN() < log_record.GetLSN() {
page_.MarkDelete(&log_record.Delete_rid, nil, nil, log_recovery.log_manager)
page_.MarkDelete(&log_record.Delete_rid, txn, nil, log_recovery.log_manager)
page_.SetLSN(log_record.GetLSN())
isRedoOccured = true
}
Expand All @@ -169,7 +170,7 @@ func (log_recovery *LogRecovery) Redo() (types.LSN, bool) {
page_ :=
access.CastPageAsTablePage(log_recovery.buffer_pool_manager.FetchPage(log_record.Delete_rid.GetPageId()))
if page_.GetLSN() < log_record.GetLSN() {
page_.RollbackDelete(&log_record.Delete_rid, nil, log_recovery.log_manager)
page_.RollbackDelete(&log_record.Delete_rid, txn, log_recovery.log_manager)
page_.SetLSN(log_record.GetLSN())
isRedoOccured = true
}
Expand All @@ -180,7 +181,7 @@ func (log_recovery *LogRecovery) Redo() (types.LSN, bool) {
if page_.GetLSN() < log_record.GetLSN() {
// UpdateTuple overwrites Old_tuple argument
// but it is no problem because log_record is read from log file again in Undo phase
page_.UpdateTuple(&log_record.New_tuple, nil, nil, &log_record.Old_tuple, &log_record.Update_rid, nil, nil, log_recovery.log_manager)
page_.UpdateTuple(&log_record.New_tuple, nil, nil, &log_record.Old_tuple, &log_record.Update_rid, txn, nil, log_recovery.log_manager)
page_.SetLSN(log_record.GetLSN())
isRedoOccured = true
}
Expand All @@ -197,7 +198,7 @@ func (log_recovery *LogRecovery) Redo() (types.LSN, bool) {
new_page := access.CastPageAsTablePage(log_recovery.buffer_pool_manager.NewPage())
page_id = new_page.GetPageId()
// fmt.Printf("page_id: %d\n", page_id)
new_page.Init(page_id, log_record.Prev_page_id, log_recovery.log_manager, nil, nil)
new_page.Init(page_id, log_record.Prev_page_id, log_recovery.log_manager, nil, txn)
//log_recovery.buffer_pool_manager.FlushPage(page_id)
log_recovery.buffer_pool_manager.UnpinPage(page_id, true)
}
Expand All @@ -215,7 +216,7 @@ func (log_recovery *LogRecovery) Redo() (types.LSN, bool) {
*iterate through active txn map and undo each operation
* when undo operation occured, return value becomes true
*/
func (log_recovery *LogRecovery) Undo() bool {
func (log_recovery *LogRecovery) Undo(txn *access.Transaction) bool {
var file_offset int
var log_record recovery.LogRecord
isUndoOccured := false
Expand Down Expand Up @@ -250,45 +251,45 @@ func (log_recovery *LogRecovery) Undo() bool {
page_ :=
access.CastPageAsTablePage(log_recovery.buffer_pool_manager.FetchPage(convRID(&log_record.Insert_rid).GetPageId()))
// fmt.Printf("insert log type, page lsn:%d, log lsn:%d", page.GetLSN(), log_record.GetLSN())
page_.ApplyDelete(&log_record.Insert_rid, nil, log_recovery.log_manager)
page_.ApplyDelete(&log_record.Insert_rid, txn, log_recovery.log_manager)
log_recovery.buffer_pool_manager.UnpinPage(convRID(&log_record.Insert_rid).GetPageId(), true)
isUndoOccured = true
} else if log_record.Log_record_type == recovery.APPLYDELETE {
page_ :=
access.CastPageAsTablePage(log_recovery.buffer_pool_manager.FetchPage(convRID(&log_record.Delete_rid).GetPageId()))
log_record.Delete_tuple.SetRID(convRID(&log_record.Delete_rid))
page_.InsertTuple(&log_record.Delete_tuple, log_recovery.log_manager, nil, nil)
page_.InsertTuple(&log_record.Delete_tuple, log_recovery.log_manager, nil, txn)
log_recovery.buffer_pool_manager.UnpinPage(convRID(&log_record.Delete_rid).GetPageId(), true)
isUndoOccured = true
} else if log_record.Log_record_type == recovery.MARKDELETE {
page_ :=
access.CastPageAsTablePage(log_recovery.buffer_pool_manager.FetchPage(convRID(&log_record.Delete_rid).GetPageId()))
page_.RollbackDelete(convRID(&log_record.Delete_rid), nil, log_recovery.log_manager)
page_.RollbackDelete(convRID(&log_record.Delete_rid), txn, log_recovery.log_manager)
log_recovery.buffer_pool_manager.UnpinPage(convRID(&log_record.Delete_rid).GetPageId(), true)
isUndoOccured = true
} else if log_record.Log_record_type == recovery.ROLLBACKDELETE {
page_ :=
access.CastPageAsTablePage(log_recovery.buffer_pool_manager.FetchPage(convRID(&log_record.Delete_rid).GetPageId()))
page_.MarkDelete(convRID(&log_record.Delete_rid), nil, nil, log_recovery.log_manager)
page_.MarkDelete(convRID(&log_record.Delete_rid), txn, nil, log_recovery.log_manager)
log_recovery.buffer_pool_manager.UnpinPage(convRID(&log_record.Delete_rid).GetPageId(), true)
isUndoOccured = true
} else if log_record.Log_record_type == recovery.UPDATE {
var org_update_rid page.RID = *convRID(&log_record.Update_rid)
page_ :=
access.CastPageAsTablePage(log_recovery.buffer_pool_manager.FetchPage(org_update_rid.GetPageId()))
is_updated, err, need_follow_tuple := page_.UpdateTuple(&log_record.Old_tuple, nil, nil, &log_record.New_tuple, convRID(&log_record.Update_rid), nil, nil, log_recovery.log_manager)
is_updated, err, need_follow_tuple := page_.UpdateTuple(&log_record.Old_tuple, nil, nil, &log_record.New_tuple, convRID(&log_record.Update_rid), txn, nil, log_recovery.log_manager)

if is_updated == false && err == access.ErrNotEnoughSpace {
// when rid is changed case (data is move to new page)

// first, delete original data
page_.ApplyDelete(&org_update_rid, nil, log_recovery.log_manager)
page_.ApplyDelete(&org_update_rid, txn, log_recovery.log_manager)

var new_rid *page.RID
var err2 error
// second, insert updated tuple to other page
for {
new_rid, err2 = page_.InsertTuple(need_follow_tuple, log_recovery.log_manager, nil, nil)
new_rid, err2 = page_.InsertTuple(need_follow_tuple, log_recovery.log_manager, nil, txn)
if err2 == nil || err2 == access.ErrEmptyTuple {
//page_.WUnlatch()
break
Expand All @@ -305,7 +306,7 @@ func (log_recovery *LogRecovery) Undo() bool {
newPage := access.CastPageAsTablePage(p)
page_.SetNextPageId(p.GetPageId())
currentPageId := page_.GetPageId()
newPage.Init(p.GetPageId(), currentPageId, log_recovery.log_manager, nil, nil)
newPage.Init(p.GetPageId(), currentPageId, log_recovery.log_manager, nil, txn)
log_recovery.buffer_pool_manager.UnpinPage(page_.GetPageId(), true)
page_ = newPage
}
Expand Down
16 changes: 12 additions & 4 deletions recovery/recovery_test/log_recovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ func TestRedo(t *testing.T) {
testingpkg.AssertFalse(t, samehada_instance.GetLogManager().IsEnabledLogging(), "")
fmt.Println("Check if tuple is not in table before recovery")
txn = samehada_instance.GetTransactionManager().Begin(nil)
txn.SetIsRecoveryPhase(true)
test_table = access.NewTableHeap(
samehada_instance.GetBufferPoolManager(),
samehada_instance.GetLogManager(),
Expand All @@ -222,6 +223,8 @@ func TestRedo(t *testing.T) {
testingpkg.AssertFalse(t, old_tuple1 != nil, "")
samehada_instance.GetTransactionManager().Commit(nil, txn)

txn = samehada_instance.GetTransactionManager().Begin(nil)
txn.SetIsRecoveryPhase(true)
fmt.Println("Begin recovery")
log_recovery := log_recovery.NewLogRecovery(
samehada_instance.GetDiskManager(),
Expand All @@ -231,12 +234,13 @@ func TestRedo(t *testing.T) {
testingpkg.AssertFalse(t, samehada_instance.GetLogManager().IsEnabledLogging(), "")

fmt.Println("Redo underway...")
log_recovery.Redo()
log_recovery.Redo(txn)
fmt.Println("Undo underway...")
log_recovery.Undo()
log_recovery.Undo(txn)

fmt.Println("Check if recovery success")
txn = samehada_instance.GetTransactionManager().Begin(nil)
txn.SetIsRecoveryPhase(true)

test_table = access.NewTableHeap(
samehada_instance.GetBufferPoolManager(),
Expand Down Expand Up @@ -345,6 +349,7 @@ func TestUndo(t *testing.T) {
fmt.Println("System restarted..")
samehada_instance = samehada.NewSamehadaInstance(t.Name(), common.BufferPoolMaxFrameNumForTest)
txn = samehada_instance.GetTransactionManager().Begin(nil)
txn.SetIsRecoveryPhase(true)

test_table = access.NewTableHeap(
samehada_instance.GetBufferPoolManager(),
Expand Down Expand Up @@ -379,16 +384,19 @@ func TestUndo(t *testing.T) {

samehada_instance.GetLogManager().DeactivateLogging()
testingpkg.AssertFalse(t, samehada_instance.GetLogManager().IsEnabledLogging(), "")
txn = samehada_instance.GetTransactionManager().Begin(nil)
txn.SetIsRecoveryPhase(true)

log_recovery.Redo()
log_recovery.Redo(txn)
fmt.Println("Redo underway...")
log_recovery.Undo()
log_recovery.Undo(txn)
fmt.Println("Undo underway...")

//samehada_instance.GetTransactionManager().Commit(txn)

fmt.Println("Check if failed txn is undo successfully")
txn = samehada_instance.GetTransactionManager().Begin(nil)
txn.SetIsRecoveryPhase(true)

fmt.Println("Check deleted tuple exists")
old_tuple1, _ = test_table.GetTuple(rid1, txn)
Expand Down
5 changes: 3 additions & 2 deletions samehada/samehada.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,16 @@ func NewSamehadaDB(dbName string, memKBytes int) *SamehadaDB {
txn := shi.GetTransactionManager().Begin(nil)

shi.GetLogManager().DeactivateLogging()
txn.SetIsRecoveryPhase(true)

var c *catalog.Catalog
if isExistingDB {
log_recovery := log_recovery.NewLogRecovery(
shi.GetDiskManager(),
shi.GetBufferPoolManager(),
shi.GetLogManager())
greatestLSN, isRedoOccured := log_recovery.Redo()
isUndoOccured := log_recovery.Undo()
greatestLSN, isRedoOccured := log_recovery.Redo(txn)
isUndoOccured := log_recovery.Undo(txn)

dman := shi.GetDiskManager()
dman.GCLogFile()
Expand Down
15 changes: 15 additions & 0 deletions storage/access/lock_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,11 @@ func (lock_manager *LockManager) LockShared(txn *Transaction, rid *page.RID) boo
//fmt.Printf("called LockShared, %v\n", rid1)
lock_manager.mutex.Lock()
defer lock_manager.mutex.Unlock()

if txn.IsRecoveryPhase() {
return true
}

slock_set := txn.GetSharedLockSet()
if txnID, ok := lock_manager.exclusive_lock_table[*rid]; ok {
if txnID == txn.GetTransactionId() {
Expand Down Expand Up @@ -196,6 +201,11 @@ func (lock_manager *LockManager) LockExclusive(txn *Transaction, rid *page.RID)
//fmt.Printf("called LockExclusive, %v\n", rid1)
lock_manager.mutex.Lock()
defer lock_manager.mutex.Unlock()

if txn.IsRecoveryPhase() {
return true
}

exlock_set := txn.GetExclusiveLockSet()
if txnID, ok := lock_manager.exclusive_lock_table[*rid]; ok {
if txnID == txn.GetTransactionId() {
Expand Down Expand Up @@ -228,6 +238,11 @@ func (lock_manager *LockManager) LockUpgrade(txn *Transaction, rid *page.RID) bo
//fmt.Printf("called LockUpgrade %v\n", rid1)
lock_manager.mutex.Lock()
defer lock_manager.mutex.Unlock()

if txn.IsRecoveryPhase() {
return true
}

//slock_set := txn.GetSharedLockSet()
elock_set := txn.GetExclusiveLockSet()
if txn.IsSharedLocked(rid) {
Expand Down
8 changes: 5 additions & 3 deletions storage/access/table_heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,9 +322,11 @@ func (t *TableHeap) GetTuple(rid *page.RID, txn *Transaction) (*tuple.Tuple, err
}()
}
}
if !txn.IsSharedLocked(rid) && !txn.IsExclusiveLocked(rid) && !t.lock_manager.LockShared(txn, rid) {
txn.SetState(ABORTED)
return nil, ErrGeneral
if !txn.IsRecoveryPhase() {
if !txn.IsSharedLocked(rid) && !txn.IsExclusiveLocked(rid) && !t.lock_manager.LockShared(txn, rid) {
txn.SetState(ABORTED)
return nil, ErrGeneral
}
}
page := CastPageAsTablePage(t.bpm.FetchPage(rid.GetPageId()))
page.RLatch()
Expand Down
Loading

0 comments on commit 8e75e75

Please sign in to comment.