Skip to content

Commit

Permalink
implementing RequestManager: no abort case passed (opTymes=10000, on …
Browse files Browse the repository at this point in the history
…memory).
  • Loading branch information
ryogrid committed Oct 14, 2023
1 parent e5099e1 commit 62e408b
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 65 deletions.
27 changes: 15 additions & 12 deletions samehada/request_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ type RequestManager struct {
}

func NewRequestManager(sdb *SamehadaDB) *RequestManager {
ch := make(chan *reqResult, 1000000)
//ch := make(chan *reqResult, 1000000)
ch := make(chan *reqResult, 100)
//ch := make(chan *reqResult)
return &RequestManager{sdb, 0, make([]*queryRequest, 0), new(sync.Mutex), 0, &ch, true}
}

Expand Down Expand Up @@ -73,25 +75,26 @@ func (reqManager *RequestManager) executeQuedTxns() {
func (reqManager *RequestManager) Run() {
for {
recvVal := <-*reqManager.inCh
if recvVal == nil { // stop signal or new request
if !reqManager.isExecutionActive {
break
}
reqManager.queMutex.Lock()
if len(reqManager.execQue) > 0 && reqManager.curExectingReqNum < common.MaxTxnThreadNum {
reqManager.executeQuedTxns()
}
reqManager.queMutex.Unlock()
} else { // receive result
if recvVal != nil { // receive result
reqManager.queMutex.Lock()
reqManager.curExectingReqNum--

if recvVal.err != nil {
// TODO: (SDB) [PARA] appropriate handling of error is needed
// TODO: (SDB) [PARA] appropriate handling of error (mainly Aborted case) is needed
panic("error on execution")
}
reqManager.queMutex.Unlock()
*recvVal.callerCh <- recvVal
}

// check stop signal or new request
if !reqManager.isExecutionActive {
break
}
reqManager.queMutex.Lock()
if len(reqManager.execQue) > 0 && reqManager.curExectingReqNum < common.MaxTxnThreadNum {
reqManager.executeQuedTxns()
}
reqManager.queMutex.Unlock()
}
}
103 changes: 50 additions & 53 deletions samehada/samehada_test/samehada_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func TestParallelQueryIssue(t *testing.T) {
}

db := samehada.NewSamehadaDB(t.Name(), 5000) // 5MB
opTimes := 24 //10000
opTimes := 10000 //24 //10000

queryVals := make([]int32, 0)

Expand All @@ -270,23 +270,20 @@ func TestParallelQueryIssue(t *testing.T) {
queryVals = append(queryVals, randVal)
}

//err, _ := db.ExecuteSQLRetValues("CREATE TABLE key_val_list (key int, val int);")
err, _ := db.ExecuteSQL("CREATE TABLE k_v_list(k INT, v INT);")
testingpkg.Assert(t, err == nil, "failed to create table")

//insCh := make(chan int32)
insCh := make(chan int32)
for ii := 0; ii < opTimes; ii++ {
//go func(val int32) {
func(val int32) {
go func(val int32) {
err, _ = db.ExecuteSQL(fmt.Sprintf("INSERT INTO k_v_list(k, v) VALUES (%d, %d);", val, val))
//insCh <- val
testingpkg.Assert(t, err == nil, "failed to insert val: "+strconv.Itoa(int(val)))
insCh <- val
}(queryVals[ii])

//testingpkg.Assert(t, err == nil, "failed to insert val: "+strconv.Itoa(int(queryVals[ii])))
}
//for ii := 0; ii < opTimes; ii++ {
// <-insCh
//}
for ii := 0; ii < opTimes; ii++ {
<-insCh
}

// shuffle query vals array elements
rand.Shuffle(len(queryVals), func(i, j int) { queryVals[i], queryVals[j] = queryVals[j], queryVals[i] })
Expand All @@ -305,37 +302,37 @@ func TestParallelQueryIssue(t *testing.T) {

startTime := time.Now()
for ii := 0; ii < opTimes; ii++ {
/* // wait last go routines finishes
if ii == opTimes-1 {
for runningThCnt > 0 {
recvRslt := <-ch
allCnt++
if recvRslt[1] == -1 {
abotedCnt++
} else {
commitedCnt++
testingpkg.Assert(t, recvRslt[0] == recvRslt[1], "failed to select val: "+strconv.Itoa(int(recvRslt[0])))
}
runningThCnt--
}
break
// wait last go routines finishes
if ii == opTimes-1 {
for runningThCnt > 0 {
recvRslt := <-ch
allCnt++
if recvRslt[1] == -1 {
abotedCnt++
} else {
commitedCnt++
testingpkg.Assert(t, recvRslt[0] == recvRslt[1], "failed to select val: "+strconv.Itoa(int(recvRslt[0])))
}
runningThCnt--
}
break
}

// wait for keeping THREAD_NUM * 2 groroutine existing
for runningThCnt >= THREAD_NUM*2 {
recvRslt := <-ch
runningThCnt--
allCnt++
if allCnt%500 == 0 {
fmt.Printf(strconv.Itoa(allCnt) + " queries done\n")
}
if recvRslt[1] == -1 {
abotedCnt++
} else {
commitedCnt++
testingpkg.Assert(t, recvRslt[0] == recvRslt[1], "failed to select val: "+strconv.Itoa(int(recvRslt[0])))
}
}*/
// wait for keeping THREAD_NUM * 2 groroutine existing
for runningThCnt >= THREAD_NUM*2 {
recvRslt := <-ch
runningThCnt--
allCnt++
if allCnt%500 == 0 {
fmt.Printf(strconv.Itoa(allCnt) + " queries done\n")
}
if recvRslt[1] == -1 {
abotedCnt++
} else {
commitedCnt++
testingpkg.Assert(t, recvRslt[0] == recvRslt[1], "failed to select val: "+strconv.Itoa(int(recvRslt[0])))
}
}

go func(queryVal int32) {
err_, results := db.ExecuteSQL(fmt.Sprintf("SELECT v FROM k_v_list WHERE k = %d;", queryVal))
Expand All @@ -352,19 +349,19 @@ func TestParallelQueryIssue(t *testing.T) {
runningThCnt++
}

for ii := 0; ii < opTimes; ii++ {
recvRslt := <-ch
allCnt++
if allCnt%500 == 0 {
fmt.Printf(strconv.Itoa(allCnt) + " queries done\n")
}
if recvRslt[1] == -1 {
abotedCnt++
} else {
commitedCnt++
testingpkg.Assert(t, recvRslt[0] == recvRslt[1], "failed to select val: "+strconv.Itoa(int(recvRslt[0])))
}
}
//for ii := 0; ii < opTimes; ii++ {
// recvRslt := <-ch
// allCnt++
// if allCnt%500 == 0 {
// fmt.Printf(strconv.Itoa(allCnt) + " queries done\n")
// }
// if recvRslt[1] == -1 {
// abotedCnt++
// } else {
// commitedCnt++
// testingpkg.Assert(t, recvRslt[0] == recvRslt[1], "failed to select val: "+strconv.Itoa(int(recvRslt[0])))
// }
//}

fmt.Println("allCnt: " + strconv.Itoa(allCnt))
fmt.Println("abotedCnt: " + strconv.Itoa(abotedCnt))
Expand Down

0 comments on commit 62e408b

Please sign in to comment.