Skip to content

Commit

Permalink
implementing RequestManager: WIP.
Browse files Browse the repository at this point in the history
  • Loading branch information
ryogrid committed Oct 14, 2023
1 parent bd740a5 commit 19ce6b1
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 10 deletions.
2 changes: 2 additions & 0 deletions common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ const (
// probability used for determin node level on SkipList
SkipListProb = 0.5 //0.25
ActiveLogKindSetting = INFO | NOT_ABORABLE_TXN_FEATURE //| COMMIT_ABORT_HANDLE_INFO | NOT_ABORABLE_TXN_FEATURE | DEBUGGING | RDB_OP_FUNC_CALL // | DEBUG_INFO //| BUFFER_INTERNAL_STATE //| DEBUGGING | DEBUG_INFO //| PIN_COUNT_ASSERT //DEBUG_INFO_DETAIL //DEBUGGING
KernelThreadNum = 24
MaxTxnThreadNum = KernelThreadNum * 1
)

type TxnID int32 // transaction id type
Expand Down
97 changes: 97 additions & 0 deletions samehada/request_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package samehada

import (
"github.com/ryogrid/SamehadaDB/common"
"sync"
)

type queryRequest struct {
reqId uint64
queryStr *string
callerCh *chan *reqResult
}

type RequestManager struct {
sdb *SamehadaDB
nextReqId uint64
execQue []*queryRequest
queMutex *sync.Mutex
curExectingReqNum uint64
inCh *chan *reqResult
isExecutionActive bool
}

func NewRequestManager(sdb *SamehadaDB) *RequestManager {
ch := make(chan *reqResult)
return &RequestManager{sdb, 0, make([]*queryRequest, 0), new(sync.Mutex), 0, &ch, true}
}

func (reqManager *RequestManager) AppendRequest(queryStr *string) *chan *reqResult {
reqManager.queMutex.Lock()

qr := new(queryRequest)
qr.reqId = reqManager.nextReqId
reqManager.nextReqId++
qr.queryStr = queryStr

retCh := make(chan *reqResult)

qr.callerCh = &retCh

reqManager.execQue = append(reqManager.execQue, qr)
reqManager.queMutex.Unlock()

// wake up execution thread
*reqManager.inCh <- nil

return &retCh
}

// caller must having lock of queMutex
func (reqManager *RequestManager) RetrieveRequest() *queryRequest {
retVal := reqManager.execQue[0]
reqManager.execQue = reqManager.execQue[1:]
return retVal
}

func (reqManager *RequestManager) StartTh() {
go reqManager.Run()
}

func (reqManager *RequestManager) StopTh() {
reqManager.isExecutionActive = false
*reqManager.inCh <- nil
}

// caller must having lock of queMutex
func (reqManager *RequestManager) executeQuedTxns() {
qr := reqManager.RetrieveRequest()
go reqManager.sdb.executeSQLForTxnTh(reqManager.inCh, qr)
reqManager.curExectingReqNum++
}

func (reqManager *RequestManager) Run() {
for {
recvVal := <-*reqManager.inCh
if recvVal == nil { // stop signal or new request
if !reqManager.isExecutionActive {
break
}
reqManager.queMutex.Lock()
if len(reqManager.execQue) > 0 && reqManager.curExectingReqNum < common.MaxTxnThreadNum {
reqManager.executeQuedTxns()
}
reqManager.queMutex.Unlock()
} else { // receive result
reqManager.queMutex.Lock()
reqManager.curExectingReqNum--

if recvVal.err != nil {
// TODO: (SDB) [PARA] appropriate handling of error is needed
panic("error on execution")
}
reqManager.queMutex.Unlock()
*recvVal.callerCh <- recvVal
}
}
}
37 changes: 30 additions & 7 deletions samehada/samehada.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ type SamehadaDB struct {
//chkpntMgr *concurrency.CheckpointManager
//planner_ planner.Planner
statistics_updator *concurrency.StatisticsUpdater
request_manager *RequestManager
}

type reqResult struct {
err error
result [][]interface{}
callerCh *chan *reqResult
}

func reconstructIndexDataOfATbl(t *catalog.TableMetadata, c *catalog.Catalog, dman disk.DiskManager, txn *access.Transaction) {
Expand Down Expand Up @@ -147,22 +154,36 @@ func NewSamehadaDB(dbName string, memKBytes int) *SamehadaDB {

//chkpntMgr := concurrency.NewCheckpointManager(shi.GetTransactionManager(), shi.GetLogManager(), shi.GetBufferPoolManager())
//chkpntMgr.StartCheckpointTh()
shi.GetCheckpointManager().StartCheckpointTh()

//shi.GetCheckpointManager().StartCheckpointTh()

// statics data is updated periodically by this thread with full scan of all tables
// this may be not good implementation of statistics, but it is enough for now...
statUpdater := concurrency.NewStatisticsUpdater(shi.GetTransactionManager(), c)
statUpdater.StartStaticsUpdaterTh()

return &SamehadaDB{shi, c, exec_engine, statUpdater}
//statUpdater.StartStaticsUpdaterTh()

ret := &SamehadaDB{shi, c, exec_engine, statUpdater, nil}
tmpReqMgr := NewRequestManager(ret)
ret.request_manager = tmpReqMgr
ret.request_manager.StartTh()

return ret
}

func (sdb *SamehadaDB) ExecuteSQL(sqlStr string) (error, [][]interface{}) {
err, results := sdb.ExecuteSQLRetValues(sqlStr)
func (sdb *SamehadaDB) executeSQLForTxnTh(ch *chan *reqResult, qr *queryRequest) {
err, results := sdb.ExecuteSQLRetValues(*qr.queryStr)
if err != nil {
return err, nil
*ch <- &reqResult{err, nil, qr.callerCh}
return
}
return nil, ConvValueListToIFs(results)
*ch <- &reqResult{nil, ConvValueListToIFs(results), qr.callerCh}
}

func (sdb *SamehadaDB) ExecuteSQL(sqlStr string) (error, [][]interface{}) {
ch := sdb.request_manager.AppendRequest(&sqlStr)
ret := <-*ch
return ret.err, ret.result
}

var PlanCreationErr error = errors.New("plan creation error")
Expand Down Expand Up @@ -218,6 +239,7 @@ func (sdb *SamehadaDB) Shutdown() {
// set a flag which is checked by checkpointing thread
sdb.statistics_updator.StopStatsUpdateTh()
sdb.shi_.GetCheckpointManager().StopCheckpointTh()
sdb.request_manager.StopTh()
isSuccess := sdb.shi_.GetBufferPoolManager().FlushAllDirtyPages()
if !isSuccess {
panic("flush all dirty pages failed!")
Expand All @@ -230,6 +252,7 @@ func (sdb *SamehadaDB) ShutdownForTescase() {
// set a flag which is checked by checkpointing thread
sdb.shi_.GetCheckpointManager().StopCheckpointTh()
sdb.statistics_updator.StopStatsUpdateTh()
sdb.request_manager.StopTh()
//sdb.shi_.Shutdown(false)
sdb.shi_.CloseFilesForTesting()
}
Expand Down
6 changes: 3 additions & 3 deletions samehada/samehada_test/samehada_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func TestParallelQueryIssue(t *testing.T) {
insCh := make(chan int32)
for ii := 0; ii < opTimes; ii++ {
go func(val int32) {
err, _ = db.ExecuteSQLRetValues(fmt.Sprintf("INSERT INTO k_v_list(k, v) VALUES (%d, %d);", val, val))
err, _ = db.ExecuteSQL(fmt.Sprintf("INSERT INTO k_v_list(k, v) VALUES (%d, %d);", val, val))
insCh <- val
}(queryVals[ii])
//testingpkg.Assert(t, err == nil, "failed to insert val: "+strconv.Itoa(int(queryVals[ii])))
Expand Down Expand Up @@ -336,14 +336,14 @@ func TestParallelQueryIssue(t *testing.T) {
}

go func(queryVal int32) {
err_, results := db.ExecuteSQLRetValues(fmt.Sprintf("SELECT v FROM k_v_list WHERE k = %d;", queryVal))
err_, results := db.ExecuteSQL(fmt.Sprintf("SELECT v FROM k_v_list WHERE k = %d;", queryVal))
if err != nil {
fmt.Println(err_)
ch <- [2]int32{queryVal, -1}
return
}

gotValue := (*results[0][0]).ToInteger()
gotValue := results[0][0].(int32)
ch <- [2]int32{queryVal, gotValue}
}(queryVals[ii])

Expand Down

0 comments on commit 19ce6b1

Please sign in to comment.