Skip to content

Commit

Permalink
Merge pull request #33 from ryogrid/shutdown-with-SIGINT-signal-impl
Browse files Browse the repository at this point in the history
implemented server shutdown with SIGINT and avoiding needless index data reconstruction of index data at relaunch
  • Loading branch information
ryogrid authored Oct 29, 2023
2 parents ad2449f + 8e3a7e7 commit 27cb040
Show file tree
Hide file tree
Showing 10 changed files with 114 additions and 38 deletions.
2 changes: 1 addition & 1 deletion lib/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ var LogTimeout time.Duration
const EnableDebug bool = false //true

// use virtual storage or not
const EnableOnMemStorage = true
const EnableOnMemStorage = false //true

// when this is true, virtual storage use is suppressed
// for test case which can't work with virtual storage
Expand Down
12 changes: 6 additions & 6 deletions lib/container/skip_list/skip_list_test/skip_list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestSerializationOfSkipLisBlockPage(t *testing.T) {
testingpkg.SimpleAssert(t, entry.Key.CompareEquals(types.NewVarchar("abcdeff")))
testingpkg.SimpleAssert(t, entry.Value == 12345)

shi.Shutdown(false)
shi.Shutdown(samehada.ShutdownPatternCloseFiles)
}

func TestSerializationOfSkipLisHeaderPage(t *testing.T) {
Expand All @@ -76,7 +76,7 @@ func TestSerializationOfSkipLisHeaderPage(t *testing.T) {
testingpkg.SimpleAssert(t, hpage.GetListStartPageId() == 7)
testingpkg.SimpleAssert(t, hpage.GetKeyType() == types.Varchar)

shi.Shutdown(false)
shi.Shutdown(samehada.ShutdownPatternCloseFiles)
}

func TestInnerInsertDeleteOfBlockPageSimple(t *testing.T) {
Expand Down Expand Up @@ -178,7 +178,7 @@ func TestInnerInsertDeleteOfBlockPageSimple(t *testing.T) {

bpm.UnpinPage(bpage2.GetPageId(), true)

shi.Shutdown(false)
shi.Shutdown(samehada.ShutdownPatternCloseFiles)
}

func TestBSearchOfSkipLisBlockPage(t *testing.T) {
Expand Down Expand Up @@ -239,7 +239,7 @@ func TestBSearchOfSkipLisBlockPage(t *testing.T) {
}
}

shi.Shutdown(false)
shi.Shutdown(samehada.ShutdownPatternCloseFiles)
}

func TestBSearchOfSkipLisBlockPage2(t *testing.T) {
Expand Down Expand Up @@ -310,7 +310,7 @@ func TestBSearchOfSkipLisBlockPage2(t *testing.T) {
}
bpage.WUnlatch()

shi.Shutdown(false)
shi.Shutdown(samehada.ShutdownPatternCloseFiles)
}

func confirmSkipListContent(t *testing.T, sl *skip_list.SkipList, step int32) int32 {
Expand Down Expand Up @@ -432,7 +432,7 @@ func TestSkipListInsertAndDeleteAll(t *testing.T) {
testingpkg.SimpleAssert(t, math.MaxUint64 == res)
}
shi.Shutdown(false)
shi.Shutdown(ShutdownPatternCloseFiles)
}
func TestSkipListItr(t *testing.T) {
Expand Down
20 changes: 10 additions & 10 deletions lib/execution/executors/executor_test/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1809,7 +1809,7 @@ func TestSimpleAggregation(t *testing.T) {
testingpkg.Assert(t, tuple_ == nil && done == true && err == nil, "second call of AggregationExecutor::Next() failed")

txn_mgr.Commit(nil, txn)
shi.Shutdown(true)
shi.Shutdown(samehada.ShutdownPatternRemoveFiles)
}

func TestSimpleGroupByAggregation(t *testing.T) {
Expand Down Expand Up @@ -1889,7 +1889,7 @@ func TestSimpleGroupByAggregation(t *testing.T) {
}

txn_mgr.Commit(nil, txn)
shi.Shutdown(true)
shi.Shutdown(samehada.ShutdownPatternRemoveFiles)
}

func TestSeqScanWithMultiItemPredicate(t *testing.T) {
Expand Down Expand Up @@ -1963,7 +1963,7 @@ func TestSeqScanWithMultiItemPredicate(t *testing.T) {

txn_mgr.Commit(nil, txn)

shi.Shutdown(true)
shi.Shutdown(samehada.ShutdownPatternRemoveFiles)
}

func TestInsertAndSpecifiedColumnUpdate(t *testing.T) {
Expand Down Expand Up @@ -2138,7 +2138,7 @@ func TestInsertAndSpecifiedColumnUpdatePageMoveCase(t *testing.T) {
testingpkg.Assert(t, types.NewInteger(99).CompareEquals(results[0].GetValue(outSchema, 0)), "value should be 99")
testingpkg.Assert(t, types.NewVarchar("updated_xxxxxxxxxxxxxxxxxxxxxxxxx").CompareEquals(results[0].GetValue(outSchema, 1)), "value should be 'updated_xxxxxxxxxxxxxxxxxxxxxxxxx'")

shi.Shutdown(true)
shi.Shutdown(samehada.ShutdownPatternRemoveFiles)
}

func TestInsertAndSpecifiedColumnUpdatePageMoveRecovery(t *testing.T) {
Expand Down Expand Up @@ -2206,7 +2206,7 @@ func TestInsertAndSpecifiedColumnUpdatePageMoveRecovery(t *testing.T) {
executionEngine.Execute(updatePlanNode, executorContext)

// system crash before finish txn
shi.Shutdown(false)
shi.Shutdown(samehada.ShutdownPatternCloseFiles)

// restart system
shi = samehada.NewSamehadaInstance(t.Name(), common.BufferPoolMaxFrameNumForTest)
Expand Down Expand Up @@ -2269,7 +2269,7 @@ func TestInsertAndSpecifiedColumnUpdatePageMoveRecovery(t *testing.T) {
testingpkg.Assert(t, types.NewInteger(99).CompareEquals(results[0].GetValue(outSchema, 0)), "value should be 99")
testingpkg.Assert(t, types.NewVarchar("k").CompareEquals(results[0].GetValue(outSchema, 1)), "value should be 'k'")

shi.Shutdown(true)
shi.Shutdown(samehada.ShutdownPatternRemoveFiles)

common.TempSuppressOnMemStorage = false
common.TempSuppressOnMemStorageMutex.Unlock()
Expand Down Expand Up @@ -2381,7 +2381,7 @@ func TestInsertAndSpecifiedColumnUpdatePageMoveOccurOnRecovery(t *testing.T) {
txn_mgr.Commit(nil, txn2)

// system crash before finish "txn"
shi.Shutdown(false)
shi.Shutdown(samehada.ShutdownPatternCloseFiles)

// restart system
shi = samehada.NewSamehadaInstance(t.Name(), common.BufferPoolMaxFrameNumForTest)
Expand Down Expand Up @@ -2444,7 +2444,7 @@ func TestInsertAndSpecifiedColumnUpdatePageMoveOccurOnRecovery(t *testing.T) {
testingpkg.Assert(t, types.NewInteger(180).CompareEquals(results[0].GetValue(outSchema, 0)), "value should be 180")
testingpkg.Assert(t, types.NewVarchar("kkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkk").CompareEquals(results[0].GetValue(outSchema, 1)), "value should be 'kkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkk'")

shi.Shutdown(true)
shi.Shutdown(samehada.ShutdownPatternRemoveFiles)

common.TempSuppressOnMemStorage = false
common.TempSuppressOnMemStorageMutex.Unlock()
Expand Down Expand Up @@ -2556,7 +2556,7 @@ func TestSimpleSeqScanAndOrderBy(t *testing.T) {
testingpkg.Assert(t, types.NewVarchar("daylight").CompareEquals(results[2].GetValue(scan_schema, 1)), "value should be 'daylight'")

txn_mgr.Commit(nil, txn)
shi.Shutdown(true)
shi.Shutdown(samehada.ShutdownPatternRemoveFiles)
}

func TestSimpleSetNullToVarchar(t *testing.T) {
Expand Down Expand Up @@ -2640,7 +2640,7 @@ func TestSimpleSetNullToVarchar(t *testing.T) {
testingpkg.Assert(t, types.NewVarchar("daylight").CompareEquals(results[2].GetValue(scan_schema, 1)), "value should be 'daylight'")

txn_mgr.Commit(nil, txn)
shi.Shutdown(true)
shi.Shutdown(samehada.ShutdownPatternRemoveFiles)
}

func TestInsertNullValueAndSeqScanWithNullComparison(t *testing.T) {
Expand Down
8 changes: 4 additions & 4 deletions lib/recovery/recovery_test/log_recovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func TestRedo(t *testing.T) {
fmt.Println("Tearing down the system..")

common.TempSuppressOnMemStorage = false
samehada_instance.Shutdown(true)
samehada_instance.Shutdown(samehada.ShutdownPatternRemoveFiles)
common.TempSuppressOnMemStorageMutex.Unlock()
}

Expand Down Expand Up @@ -212,7 +212,7 @@ func TestUndo(t *testing.T) {

fmt.Println("System crash before commit")
// delete samehada_instance
samehada_instance.Shutdown(false)
samehada_instance.Shutdown(samehada.ShutdownPatternCloseFiles)

fmt.Println("System restarted..")
samehada_instance = samehada.NewSamehadaInstance(t.Name(), common.BufferPoolMaxFrameNumForTest)
Expand Down Expand Up @@ -280,7 +280,7 @@ func TestUndo(t *testing.T) {
fmt.Println("Tearing down the system..")

common.TempSuppressOnMemStorage = false
samehada_instance.Shutdown(true)
samehada_instance.Shutdown(samehada.ShutdownPatternRemoveFiles)
common.TempSuppressOnMemStorageMutex.Unlock()
}

Expand Down Expand Up @@ -398,7 +398,7 @@ func TestCheckpoint(t *testing.T) {
fmt.Println("Tearing down the system..")

common.TempSuppressOnMemStorage = false
samehada_instance.Shutdown(true)
samehada_instance.Shutdown(samehada.ShutdownPatternRemoveFiles)
common.TempSuppressOnMemStorageMutex.Unlock()
}

Expand Down
2 changes: 1 addition & 1 deletion lib/samehada/request_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (reqManager *RequestManager) Run() {
}
}

// check stop signal or new request
// check stop signal_handle or new request
if !reqManager.isExecutionActive {
break
}
Expand Down
19 changes: 11 additions & 8 deletions lib/samehada/samehada.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,11 @@ func ReconstructAllIndexData(c *catalog.Catalog, dman disk.DiskManager, txn *acc

func NewSamehadaDB(dbName string, memKBytes int) *SamehadaDB {
isExistingDB := false
isExistingLog := false

if !common.EnableOnMemStorage || common.TempSuppressOnMemStorage {
isExistingDB = samehada_util.FileExists(dbName + ".db")
isExistingLog = samehada_util.FileExists(dbName + ".log")
}

bpoolSize := math.Floor(float64(memKBytes*1024) / float64(common.PageSize))
Expand All @@ -125,16 +127,17 @@ func NewSamehadaDB(dbName string, memKBytes int) *SamehadaDB {
shi.GetDiskManager(),
shi.GetBufferPoolManager(),
shi.GetLogManager())
greatestLSN, isRedoOccured := log_recovery.Redo(txn)
isUndoOccured := log_recovery.Undo(txn)
greatestLSN, _ := log_recovery.Redo(txn)
log_recovery.Undo(txn)

dman := shi.GetDiskManager()
dman.GCLogFile()
shi.GetLogManager().SetNextLSN(greatestLSN + 1)

c = catalog.RecoveryCatalogFromCatalogPage(shi.GetBufferPoolManager(), shi.GetLogManager(), shi.GetLockManager(), txn)

if isRedoOccured || isUndoOccured {
// db file exists but log file doesn't exist case means gracefully shutdown. so this block is passed
if isExistingLog {
// index date reloading/recovery is not implemented yet
// so when db did not exit graceful, all index data should be recounstruct
// (hash index uses already allocated pages but skip list index deserts these...)
Expand Down Expand Up @@ -239,11 +242,11 @@ func (sdb *SamehadaDB) Shutdown() {
sdb.statistics_updator.StopStatsUpdateTh()
sdb.shi_.GetCheckpointManager().StopCheckpointTh()
sdb.request_manager.StopTh()
isSuccess := sdb.shi_.GetBufferPoolManager().FlushAllDirtyPages()
if !isSuccess {
panic("flush all dirty pages failed!")
}
sdb.shi_.Shutdown(false)
//isSuccess := sdb.shi_.GetBufferPoolManager().FlushAllDirtyPages()
//if !isSuccess {
// panic("flush all dirty pages failed!")
//}
sdb.shi_.Shutdown(ShutdownPatternRemoveLogOnly)
}

// no flush of page buffer
Expand Down
27 changes: 22 additions & 5 deletions lib/samehada/samehada_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@ import (
"github.com/ryogrid/SamehadaDB/lib/storage/disk"
)

type ShutdownPattern int

const (
ShutdownPatternRemoveFiles ShutdownPattern = iota
ShutdownPatternCloseFiles
ShutdownPatternRemoveLogOnly
)

type SamehadaInstance struct {
disk_manager disk.DiskManager
log_manager *recovery.LogManager
Expand Down Expand Up @@ -70,19 +78,28 @@ func (si *SamehadaInstance) GetCheckpointManager() *concurrency.CheckpointManage
}

// functionality is Flushing dirty pages, shutdown of DiskManager and action around DB/Log files
func (si *SamehadaInstance) Shutdown(IsRemoveFiles bool) {
if IsRemoveFiles {
func (si *SamehadaInstance) Shutdown(shutdownPat ShutdownPattern) {
switch shutdownPat {
case ShutdownPatternRemoveFiles:
//close
si.disk_manager.ShutDown()
//remove
si.disk_manager.RemoveDBFile()
si.disk_manager.RemoveLogFile()
} else {
case ShutdownPatternCloseFiles:
si.log_manager.Flush()
// TODO: (SDB) flush only dirty pages
si.bpm.FlushAllPages()
si.bpm.FlushAllDirtyPages()
// close only
si.disk_manager.ShutDown()
case ShutdownPatternRemoveLogOnly:
si.log_manager.Flush()
si.bpm.FlushAllDirtyPages()
// close files
si.disk_manager.ShutDown()

si.disk_manager.RemoveLogFile()
default:
panic("invalid shutdown pattern")
}
}

Expand Down
4 changes: 2 additions & 2 deletions lib/storage/index/index_test/hash_table_index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func TestRecounstructionOfHashIndex(t *testing.T) {
}

shi.GetTransactionManager().Commit(nil, txn)
shi.Shutdown(false)
shi.Shutdown(samehada.ShutdownPatternCloseFiles)

// ----------- check recovery includes index data ----------

Expand Down Expand Up @@ -239,6 +239,6 @@ func TestRecounstructionOfHashIndex(t *testing.T) {
shi.GetTransactionManager().Commit(nil, txn)

common.TempSuppressOnMemStorage = false
shi.Shutdown(false)
shi.Shutdown(samehada.ShutdownPatternCloseFiles)
common.TempSuppressOnMemStorageMutex.Unlock()
}
27 changes: 26 additions & 1 deletion server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"fmt"
"github.com/ant0ine/go-json-rest/rest"
"github.com/ryogrid/SamehadaDB/lib/samehada"
"github.com/ryogrid/SamehadaDB/server/signal_handle"
"log"
"net/http"
"os"
)

type QueryInput struct {
Expand All @@ -22,8 +24,14 @@ type QueryOutput struct {
}

var db = samehada.NewSamehadaDB("default", 5000) //5MB
var IsStopped = false

func postQuery(w rest.ResponseWriter, req *rest.Request) {
if signal_handle.IsStopped {
rest.Error(w, "Server is stopped", http.StatusGone)
return
}

input := QueryInput{}
err := req.DecodeJsonPayload(&input)

Expand Down Expand Up @@ -54,7 +62,7 @@ func postQuery(w rest.ResponseWriter, req *rest.Request) {
})
}

func main() {
func launchDBAndListen() {
api := rest.NewApi()

// the Middleware stack
Expand Down Expand Up @@ -87,3 +95,20 @@ func main() {
api.MakeHandler(),
))
}

func main() {
exitNotifyCh := make(chan bool, 1)

// start signal handler thread
go signal_handle.SignalHandlerTh(db, &exitNotifyCh)

// start server
go launchDBAndListen()

// wait shutdown operation finished notification
<-exitNotifyCh

fmt.Println("Server is stopped gracefully")
// exit process
os.Exit(0)
}
Loading

0 comments on commit 27cb040

Please sign in to comment.