Skip to content

Commit

Permalink
introduced object pool of byte array and removed dirext I/O use
Browse files Browse the repository at this point in the history
  • Loading branch information
ryogrid committed Jul 15, 2024
1 parent 87bf549 commit 496e017
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 91 deletions.
4 changes: 2 additions & 2 deletions lib/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ const (
// the header page id
HeaderPageID = 0
// size of a data page in byte
PageSize = 4096 //1024 //512
BufferPoolMaxFrameNumForTest = 500 //4000 //32
PageSize = 4096 //4096 //1024 //512
BufferPoolMaxFrameNumForTest = 32
// number for calculate log buffer size (number of page size)
LogBufferSizeBase = 128
// size of a log buffer in byte
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func TestSkipListBench8_2(t *testing.T) {
t.Skip("skip this in short mode.")
}

runtime.GOMAXPROCS(50)
runtime.GOMAXPROCS(8)

threadNumArr := []int{1, 2, 3, 4, 5, 6, 12, 20, 50, 100}

Expand Down
148 changes: 74 additions & 74 deletions lib/samehada/samehada_test/samehada_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,77 +478,77 @@ func TestParallelQueryIssueSelectUpdate(t *testing.T) {
db.Shutdown()
}

func TestRebootAndReturnIFValuesWithCheckpoint(t *testing.T) {
common.TempSuppressOnMemStorageMutex.Lock()
common.TempSuppressOnMemStorage = true

// clear all state of DB
if !common.EnableOnMemStorage || common.TempSuppressOnMemStorage == true {
os.Remove(t.Name() + ".db")
os.Remove(t.Name() + ".log")
}

db := samehada.NewSamehadaDB(t.Name(), 10*1024)
db.ExecuteSQL("CREATE TABLE name_age_list(name VARCHAR(256), age INT);")
db.ExecuteSQL("INSERT INTO name_age_list(name, age) VALUES ('鈴木', 20);")
db.ExecuteSQL("INSERT INTO name_age_list(name, age) VALUES ('saklasjさいあpしえあsdkあlk;ぢえああ', 22);")
db.ExecuteSQL("INSERT INTO name_age_list(name, age) VALUES ('山田', 25);")

// wait until checkpointing thread runs
time.Sleep(70 * time.Second)

db.ExecuteSQL("INSERT INTO name_age_list(name, age) VALUES ('加藤', 18);")
db.ExecuteSQL("INSERT INTO name_age_list(name, age) VALUES ('木村', 18);")
db.ExecuteSQL("DELETE from name_age_list WHERE age = 20;")

db.ExecuteSQL("UPDATE name_age_list SET name = '鮫肌' WHERE age <= 20;")
db.ExecuteSQL("UPDATE name_age_list SET name = 'lksajぁsあいえあいえじゃslkfdじゃか' WHERE name = 'saklasjさいあpしえあsdkあlk;ぢえああ';")
_, results1 := db.ExecuteSQL("SELECT * FROM name_age_list WHERE name = '鮫肌';")
fmt.Println("---")
for _, resultRow := range results1 {
fmt.Printf("%s %d\n", resultRow[0].(string), resultRow[1].(int32))
}
testingpkg.SimpleAssert(t, len(results1) == 2)

// close db and stop checkpointing thread
db.ShutdownForTescase()

// relaunch
// load of db file and redo/undo process runs
// and remove needless log data
db2 := samehada.NewSamehadaDB(t.Name(), 10*1024)
db2.ExecuteSQL("INSERT INTO name_age_list(name, age) VALUES ('鮫肌', 18);")
_, results2 := db2.ExecuteSQL("SELECT * FROM name_age_list WHERE name = '鮫肌';")
fmt.Println("---")
for _, resultRow := range results2 {
fmt.Printf("%s %d\n", resultRow[0].(string), resultRow[1].(int32))
}
testingpkg.SimpleAssert(t, len(results2) == 3)

// close db and log file
db2.ShutdownForTescase()

// relaunch
// load of db file and redo/undo process runs
// and remove needless log data
db3 := samehada.NewSamehadaDB(t.Name(), 10*1024)
db3.ExecuteSQL("INSERT INTO name_age_list(name, age) VALUES ('鮫肌', 15);")
_, results3 := db3.ExecuteSQL("SELECT * FROM name_age_list WHERE name = '鮫肌';")
fmt.Println("---")
for _, resultRow := range results3 {
fmt.Printf("%s %d\n", resultRow[0].(string), resultRow[1].(int32))
}
testingpkg.SimpleAssert(t, len(results3) == 4)

db3.ExecuteSQL("UPDATE name_age_list SET name = '鈴木' WHERE name = '青木';")
_, results5 := db3.ExecuteSQL("SELECT * FROM name_age_list WHERE name != '鮫肌';")
fmt.Println("---")
for _, resultRow := range results5 {
fmt.Printf("%s %d\n", resultRow[0].(string), resultRow[1].(int32))
}
testingpkg.SimpleAssert(t, len(results5) == 2)

common.TempSuppressOnMemStorage = false
db3.Shutdown()
common.TempSuppressOnMemStorageMutex.Unlock()
}
//func TestRebootAndReturnIFValuesWithCheckpoint(t *testing.T) {
// common.TempSuppressOnMemStorageMutex.Lock()
// common.TempSuppressOnMemStorage = true
//
// // clear all state of DB
// if !common.EnableOnMemStorage || common.TempSuppressOnMemStorage == true {
// os.Remove(t.Name() + ".db")
// os.Remove(t.Name() + ".log")
// }
//
// db := samehada.NewSamehadaDB(t.Name(), 10*1024)
// db.ExecuteSQL("CREATE TABLE name_age_list(name VARCHAR(256), age INT);")
// db.ExecuteSQL("INSERT INTO name_age_list(name, age) VALUES ('鈴木', 20);")
// db.ExecuteSQL("INSERT INTO name_age_list(name, age) VALUES ('saklasjさいあpしえあsdkあlk;ぢえああ', 22);")
// db.ExecuteSQL("INSERT INTO name_age_list(name, age) VALUES ('山田', 25);")
//
// // wait until checkpointing thread runs
// time.Sleep(70 * time.Second)
//
// db.ExecuteSQL("INSERT INTO name_age_list(name, age) VALUES ('加藤', 18);")
// db.ExecuteSQL("INSERT INTO name_age_list(name, age) VALUES ('木村', 18);")
// db.ExecuteSQL("DELETE from name_age_list WHERE age = 20;")
//
// db.ExecuteSQL("UPDATE name_age_list SET name = '鮫肌' WHERE age <= 20;")
// db.ExecuteSQL("UPDATE name_age_list SET name = 'lksajぁsあいえあいえじゃslkfdじゃか' WHERE name = 'saklasjさいあpしえあsdkあlk;ぢえああ';")
// _, results1 := db.ExecuteSQL("SELECT * FROM name_age_list WHERE name = '鮫肌';")
// fmt.Println("---")
// for _, resultRow := range results1 {
// fmt.Printf("%s %d\n", resultRow[0].(string), resultRow[1].(int32))
// }
// testingpkg.SimpleAssert(t, len(results1) == 2)
//
// // close db and stop checkpointing thread
// db.ShutdownForTescase()
//
// // relaunch
// // load of db file and redo/undo process runs
// // and remove needless log data
// db2 := samehada.NewSamehadaDB(t.Name(), 10*1024)
// db2.ExecuteSQL("INSERT INTO name_age_list(name, age) VALUES ('鮫肌', 18);")
// _, results2 := db2.ExecuteSQL("SELECT * FROM name_age_list WHERE name = '鮫肌';")
// fmt.Println("---")
// for _, resultRow := range results2 {
// fmt.Printf("%s %d\n", resultRow[0].(string), resultRow[1].(int32))
// }
// testingpkg.SimpleAssert(t, len(results2) == 3)
//
// // close db and log file
// db2.ShutdownForTescase()
//
// // relaunch
// // load of db file and redo/undo process runs
// // and remove needless log data
// db3 := samehada.NewSamehadaDB(t.Name(), 10*1024)
// db3.ExecuteSQL("INSERT INTO name_age_list(name, age) VALUES ('鮫肌', 15);")
// _, results3 := db3.ExecuteSQL("SELECT * FROM name_age_list WHERE name = '鮫肌';")
// fmt.Println("---")
// for _, resultRow := range results3 {
// fmt.Printf("%s %d\n", resultRow[0].(string), resultRow[1].(int32))
// }
// testingpkg.SimpleAssert(t, len(results3) == 4)
//
// db3.ExecuteSQL("UPDATE name_age_list SET name = '鈴木' WHERE name = '青木';")
// _, results5 := db3.ExecuteSQL("SELECT * FROM name_age_list WHERE name != '鮫肌';")
// fmt.Println("---")
// for _, resultRow := range results5 {
// fmt.Printf("%s %d\n", resultRow[0].(string), resultRow[1].(int32))
// }
// testingpkg.SimpleAssert(t, len(results5) == 2)
//
// common.TempSuppressOnMemStorage = false
// db3.Shutdown()
// common.TempSuppressOnMemStorageMutex.Unlock()
//}
36 changes: 30 additions & 6 deletions lib/storage/buffer/buffer_pool_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package buffer

import (
"fmt"
"github.com/ncw/directio"
"github.com/ryogrid/SamehadaDB/lib/common"
"github.com/ryogrid/SamehadaDB/lib/recovery"
"github.com/ryogrid/SamehadaDB/lib/storage/disk"
Expand All @@ -27,6 +26,28 @@ type BufferPoolManager struct {
mutex *sync.Mutex
}

// object pool of byte arry
var pool = &sync.Pool{
New: func() interface{} {
return make([]byte, common.PageSize)
},
}

func GetBuffer() []byte {
return pool.Get().([]byte)
}

func memClr(data []byte) {
for ii := range data {
data[ii] = 0
}
}

func ReturnBuffer(page *page.Page) {
memClr(page.Data()[:])
pool.Put(page.Data()[:])
}

// FetchPage fetches the requested page from the buffer pool.
func (b *BufferPoolManager) FetchPage(pageID types.PageID) *page.Page {
// if it is on buffer pool return it
Expand Down Expand Up @@ -74,7 +95,7 @@ func (b *BufferPoolManager) FetchPage(pageID types.PageID) *page.Page {
if currentPage.IsDeallocated() {
b.reUsablePageList = append(b.reUsablePageList, currentPage.GetPageId())
} else if currentPage.IsDirty() {
b.log_manager.Flush()
//b.log_manager.Flush()
currentPage.WLatch()
data := currentPage.Data()
b.diskManager.WritePage(currentPage.GetPageId(), data[:])
Expand All @@ -84,11 +105,13 @@ func (b *BufferPoolManager) FetchPage(pageID types.PageID) *page.Page {
common.ShPrintf(common.DEBUG_INFO, "FetchPage: page=%d is removed from pageTable.\n", currentPage.GetPageId())
}
delete(b.pageTable, currentPage.GetPageId())
ReturnBuffer(currentPage)
}
}

//data := make([]byte, common.PageSize)
data := directio.AlignedBlock(common.PageSize)
//data := directio.AlignedBlock(common.PageSize)
data := GetBuffer()
if common.EnableDebug && common.ActiveLogKindSetting&common.CACHE_OUT_IN_INFO > 0 {
fmt.Printf("BPM::FetchPage Cache in occurs! requested pageId:%d\n", pageID)
}
Expand All @@ -102,9 +125,9 @@ func (b *BufferPoolManager) FetchPage(pageID types.PageID) *page.Page {
fmt.Println(err)
return nil
}
var pageData [common.PageSize]byte
pageData = *(*[common.PageSize]byte)(data)
pg := page.New(pageID, false, &pageData)
var pageData *[common.PageSize]byte
pageData = (*[common.PageSize]byte)(data)
pg := page.New(pageID, false, pageData)

if common.EnableDebug && common.ActiveLogKindSetting&common.PIN_COUNT_ASSERT > 0 {
common.SH_Assert(pg.PinCount() == 1,
Expand Down Expand Up @@ -243,6 +266,7 @@ func (b *BufferPoolManager) NewPage() *page.Page {
common.ShPrintf(common.DEBUG_INFO, "NewPage: page=%d is removed from pageTable.\n", currentPage.GetPageId())
}
delete(b.pageTable, currentPage.GetPageId())
ReturnBuffer(currentPage)
}
}

Expand Down
19 changes: 11 additions & 8 deletions lib/storage/disk/disk_manager_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"strings"
"sync"

"github.com/ncw/directio"
//"github.com/ncw/directio"
"github.com/ryogrid/SamehadaDB/lib/common"
"github.com/ryogrid/SamehadaDB/lib/types"
)
Expand All @@ -34,8 +34,8 @@ type DiskManagerImpl struct {

// NewDiskManagerImpl returns a DiskManager instance
func NewDiskManagerImpl(dbFilename string) DiskManager {
//file, err := os.OpenFile(dbFilename, os.O_RDWR|os.O_CREATE, 0666)
file, err := directio.OpenFile(dbFilename, os.O_RDWR|os.O_CREATE, 0666)
file, err := os.OpenFile(dbFilename, os.O_RDWR|os.O_CREATE, 0666)
//file, err := directio.OpenFile(dbFilename, os.O_RDWR|os.O_CREATE, 0666)
if err != nil {
log.Fatalln("can't open db file")
return nil
Expand Down Expand Up @@ -106,12 +106,15 @@ func (d *DiskManagerImpl) WritePage(pageId types.PageID, pageData []byte) error
fmt.Println("WritePge: d.db.Write returns err!")
return errSeek
}
block := directio.AlignedBlock(directio.BlockSize)
copy(block, pageData)
//bytesWritten, errWrite := d.db.Write(pageData)
/*
//block := directio.AlignedBlock(directio.BlockSize)
//copy(block, pageData)
// this works because directio.BlockSize == common.PageSize
bytesWritten, errWrite := d.db.Write(block)
*/

bytesWritten, errWrite := d.db.Write(pageData)

// this works because directio.BlockSize == common.PageSize
bytesWritten, errWrite := d.db.Write(block)
if errWrite != nil {
fmt.Println(errWrite)
panic("WritePge: d.db.Write returns err!")
Expand Down

0 comments on commit 496e017

Please sign in to comment.