diff --git a/samehada/request_manager.go b/samehada/request_manager.go index 814ff6dc..9e2fe025 100644 --- a/samehada/request_manager.go +++ b/samehada/request_manager.go @@ -22,7 +22,9 @@ type RequestManager struct { } func NewRequestManager(sdb *SamehadaDB) *RequestManager { - ch := make(chan *reqResult, 1000000) + //ch := make(chan *reqResult, 1000000) + ch := make(chan *reqResult, 100) + //ch := make(chan *reqResult) return &RequestManager{sdb, 0, make([]*queryRequest, 0), new(sync.Mutex), 0, &ch, true} } @@ -73,25 +75,26 @@ func (reqManager *RequestManager) executeQuedTxns() { func (reqManager *RequestManager) Run() { for { recvVal := <-*reqManager.inCh - if recvVal == nil { // stop signal or new request - if !reqManager.isExecutionActive { - break - } - reqManager.queMutex.Lock() - if len(reqManager.execQue) > 0 && reqManager.curExectingReqNum < common.MaxTxnThreadNum { - reqManager.executeQuedTxns() - } - reqManager.queMutex.Unlock() - } else { // receive result + if recvVal != nil { // receive result reqManager.queMutex.Lock() reqManager.curExectingReqNum-- if recvVal.err != nil { - // TODO: (SDB) [PARA] appropriate handling of error is needed + // TODO: (SDB) [PARA] appropriate handling of error (mainly Aborted case) is needed panic("error on execution") } reqManager.queMutex.Unlock() *recvVal.callerCh <- recvVal } + + // check stop signal or new request + if !reqManager.isExecutionActive { + break + } + reqManager.queMutex.Lock() + if len(reqManager.execQue) > 0 && reqManager.curExectingReqNum < common.MaxTxnThreadNum { + reqManager.executeQuedTxns() + } + reqManager.queMutex.Unlock() } } diff --git a/samehada/samehada_test/samehada_test.go b/samehada/samehada_test/samehada_test.go index 23567c8d..70376702 100644 --- a/samehada/samehada_test/samehada_test.go +++ b/samehada/samehada_test/samehada_test.go @@ -261,7 +261,7 @@ func TestParallelQueryIssue(t *testing.T) { } db := samehada.NewSamehadaDB(t.Name(), 5000) // 5MB - opTimes := 24 //10000 + opTimes := 10000 //24 //10000 queryVals := make([]int32, 0) @@ -270,23 +270,20 @@ func TestParallelQueryIssue(t *testing.T) { queryVals = append(queryVals, randVal) } - //err, _ := db.ExecuteSQLRetValues("CREATE TABLE key_val_list (key int, val int);") err, _ := db.ExecuteSQL("CREATE TABLE k_v_list(k INT, v INT);") testingpkg.Assert(t, err == nil, "failed to create table") - //insCh := make(chan int32) + insCh := make(chan int32) for ii := 0; ii < opTimes; ii++ { - //go func(val int32) { - func(val int32) { + go func(val int32) { err, _ = db.ExecuteSQL(fmt.Sprintf("INSERT INTO k_v_list(k, v) VALUES (%d, %d);", val, val)) - //insCh <- val + testingpkg.Assert(t, err == nil, "failed to insert val: "+strconv.Itoa(int(val))) + insCh <- val }(queryVals[ii]) - - //testingpkg.Assert(t, err == nil, "failed to insert val: "+strconv.Itoa(int(queryVals[ii]))) } - //for ii := 0; ii < opTimes; ii++ { - // <-insCh - //} + for ii := 0; ii < opTimes; ii++ { + <-insCh + } // shuffle query vals array elements rand.Shuffle(len(queryVals), func(i, j int) { queryVals[i], queryVals[j] = queryVals[j], queryVals[i] }) @@ -305,37 +302,37 @@ func TestParallelQueryIssue(t *testing.T) { startTime := time.Now() for ii := 0; ii < opTimes; ii++ { - /* // wait last go routines finishes - if ii == opTimes-1 { - for runningThCnt > 0 { - recvRslt := <-ch - allCnt++ - if recvRslt[1] == -1 { - abotedCnt++ - } else { - commitedCnt++ - testingpkg.Assert(t, recvRslt[0] == recvRslt[1], "failed to select val: "+strconv.Itoa(int(recvRslt[0]))) - } - runningThCnt-- - } - break + // wait last go routines finishes + if ii == opTimes-1 { + for runningThCnt > 0 { + recvRslt := <-ch + allCnt++ + if recvRslt[1] == -1 { + abotedCnt++ + } else { + commitedCnt++ + testingpkg.Assert(t, recvRslt[0] == recvRslt[1], "failed to select val: "+strconv.Itoa(int(recvRslt[0]))) } + runningThCnt-- + } + break + } - // wait for keeping THREAD_NUM * 2 groroutine existing - for runningThCnt >= THREAD_NUM*2 { - recvRslt := <-ch - runningThCnt-- - allCnt++ - if allCnt%500 == 0 { - fmt.Printf(strconv.Itoa(allCnt) + " queries done\n") - } - if recvRslt[1] == -1 { - abotedCnt++ - } else { - commitedCnt++ - testingpkg.Assert(t, recvRslt[0] == recvRslt[1], "failed to select val: "+strconv.Itoa(int(recvRslt[0]))) - } - }*/ + // wait for keeping THREAD_NUM * 2 groroutine existing + for runningThCnt >= THREAD_NUM*2 { + recvRslt := <-ch + runningThCnt-- + allCnt++ + if allCnt%500 == 0 { + fmt.Printf(strconv.Itoa(allCnt) + " queries done\n") + } + if recvRslt[1] == -1 { + abotedCnt++ + } else { + commitedCnt++ + testingpkg.Assert(t, recvRslt[0] == recvRslt[1], "failed to select val: "+strconv.Itoa(int(recvRslt[0]))) + } + } go func(queryVal int32) { err_, results := db.ExecuteSQL(fmt.Sprintf("SELECT v FROM k_v_list WHERE k = %d;", queryVal)) @@ -352,19 +349,19 @@ func TestParallelQueryIssue(t *testing.T) { runningThCnt++ } - for ii := 0; ii < opTimes; ii++ { - recvRslt := <-ch - allCnt++ - if allCnt%500 == 0 { - fmt.Printf(strconv.Itoa(allCnt) + " queries done\n") - } - if recvRslt[1] == -1 { - abotedCnt++ - } else { - commitedCnt++ - testingpkg.Assert(t, recvRslt[0] == recvRslt[1], "failed to select val: "+strconv.Itoa(int(recvRslt[0]))) - } - } + //for ii := 0; ii < opTimes; ii++ { + // recvRslt := <-ch + // allCnt++ + // if allCnt%500 == 0 { + // fmt.Printf(strconv.Itoa(allCnt) + " queries done\n") + // } + // if recvRslt[1] == -1 { + // abotedCnt++ + // } else { + // commitedCnt++ + // testingpkg.Assert(t, recvRslt[0] == recvRslt[1], "failed to select val: "+strconv.Itoa(int(recvRslt[0]))) + // } + //} fmt.Println("allCnt: " + strconv.Itoa(allCnt)) fmt.Println("abotedCnt: " + strconv.Itoa(abotedCnt))