Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: local storage reservations fixes #11866

Merged
merged 10 commits into from
Apr 15, 2024
8 changes: 8 additions & 0 deletions curiosrc/ffi/sdr_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,14 @@ func (sb *SealCalls) GenerateSDR(ctx context.Context, taskID harmonytask.TaskID,
return xerrors.Errorf("computing replica id: %w", err)
}

// make sure the cache dir is empty
if err := os.RemoveAll(paths.Cache); err != nil {
return xerrors.Errorf("removing cache dir: %w", err)
}
if err := os.MkdirAll(paths.Cache, 0755); err != nil {
return xerrors.Errorf("mkdir cache dir: %w", err)
}

// generate new sector key
err = ffi.GenerateSDR(
sector.ProofType,
Expand Down
8 changes: 7 additions & 1 deletion curiosrc/ffi/task_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ffi

import (
"context"
"sync"
"time"

"golang.org/x/xerrors"
Expand Down Expand Up @@ -170,9 +171,14 @@ func (t *TaskStorage) Claim(taskID int) error {
return err
}

var releaseOnce sync.Once
releaseFunc := func() {
releaseOnce.Do(release)
}

sres := &StorageReservation{
SectorRef: sectorRef,
Release: release,
Release: releaseFunc,
Paths: pathsFs,
PathIDs: pathIDs,

Expand Down
99 changes: 74 additions & 25 deletions storage/paths/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"math/rand"
"os"
"path/filepath"
"runtime"
"sync"
"time"

Expand Down Expand Up @@ -45,20 +46,26 @@ type Local struct {
localLk sync.RWMutex
}

type sectorFile struct {
sid abi.SectorID
ft storiface.SectorFileType
}

type path struct {
local string // absolute local path
maxStorage uint64

reserved int64
reservations map[abi.SectorID]storiface.SectorFileType
reservations map[sectorFile]int64
}

// statExistingSectorForReservation is optional parameter for stat method
// which will make it take into account existing sectors when calculating
// available space for new reservations
type statExistingSectorForReservation struct {
id abi.SectorID
ft storiface.SectorFileType
id abi.SectorID
ft storiface.SectorFileType
overhead int64
}

func (p *path) stat(ls LocalStorage, newReserve ...statExistingSectorForReservation) (stat fsutil.FsStat, newResvOnDisk int64, err error) {
Expand All @@ -72,7 +79,7 @@ func (p *path) stat(ls LocalStorage, newReserve ...statExistingSectorForReservat
stat.Reserved = p.reserved
var newReserveOnDisk int64

accountExistingFiles := func(id abi.SectorID, fileType storiface.SectorFileType) (int64, error) {
accountExistingFiles := func(id abi.SectorID, fileType storiface.SectorFileType, overhead int64) (int64, error) {
sp := p.sectorPath(id, fileType)

used, err := ls.DiskUsage(sp)
Expand All @@ -94,35 +101,58 @@ func (p *path) stat(ls LocalStorage, newReserve ...statExistingSectorForReservat
return 0, nil
}

log.Debugw("accounting existing files", "id", id, "fileType", fileType, "path", sp, "used", used, "overhead", overhead)
return used, nil
}

for id, ft := range p.reservations {
for _, fileType := range ft.AllSet() {
onDisk, err := accountExistingFiles(id, fileType)
if err != nil {
return fsutil.FsStat{}, 0, err
}
stat.Reserved -= onDisk
for id, oh := range p.reservations {
onDisk, err := accountExistingFiles(id.sid, id.ft, oh)
if err != nil {
return fsutil.FsStat{}, 0, err
}
if onDisk > oh {
log.Warnw("reserved space on disk is greater than expected", "id", id.sid, "fileType", id.ft, "onDisk", onDisk, "oh", oh)
onDisk = oh
}

stat.Reserved -= onDisk
}
for _, reservation := range newReserve {
for _, fileType := range reservation.ft.AllSet() {
if p.reservations[reservation.id]&fileType != 0 {
log.Debugw("accounting existing files for new reservation", "id", reservation.id, "fileType", fileType, "overhead", reservation.overhead)

resID := sectorFile{reservation.id, fileType}

if _, has := p.reservations[resID]; has {
// already accounted for
continue
}

onDisk, err := accountExistingFiles(reservation.id, fileType)
onDisk, err := accountExistingFiles(reservation.id, fileType, reservation.overhead)
if err != nil {
return fsutil.FsStat{}, 0, err
}
if onDisk > reservation.overhead {
log.Warnw("reserved space on disk is greater than expected (new resv)", "id", reservation.id, "fileType", fileType, "onDisk", onDisk, "oh", reservation.overhead)
onDisk = reservation.overhead
}

newReserveOnDisk += onDisk
}
}

if stat.Reserved < 0 {
log.Warnf("negative reserved storage: p.reserved=%d, reserved: %d", p.reserved, stat.Reserved)
//log.Warnf("negative reserved storage: p.reserved=%d, reserved: %d", p.reserved, stat.Reserved)
var jsonReservations []map[string]interface{}
for id, res := range p.reservations {
jsonReservations = append(jsonReservations, map[string]interface{}{
"id": id.sid,
"ft": id.ft,
"res": res,
})
}

log.Warnw("negative reserved storage", "reserved", stat.Reserved, "origResv", p.reserved, "reservations", len(p.reservations), "newReserveOnDisk", newReserveOnDisk, "reservations", jsonReservations)
stat.Reserved = 0
}

Expand Down Expand Up @@ -199,7 +229,7 @@ func (st *Local) OpenPath(ctx context.Context, p string) error {

maxStorage: meta.MaxStorage,
reserved: 0,
reservations: map[abi.SectorID]storiface.SectorFileType{},
reservations: map[sectorFile]int64{},
}

fst, _, err := out.stat(st.localStorage)
Expand Down Expand Up @@ -438,7 +468,24 @@ func (st *Local) Reserve(ctx context.Context, sid storiface.SectorRef, ft storif

st.localLk.Lock()

done := func() {}
var firstDonebuf []byte
var firstDoneN int

var doneCalled bool
done := func() {
if doneCalled {
curStack := make([]byte, 20480)
curStackN := runtime.Stack(curStack, false)

log.Errorw("double done call", "sector", sid, "fileType", ft, "prevStack", string(firstDonebuf[:firstDoneN]), "curStack", string(curStack[:curStackN]))
}

firstDonebuf = make([]byte, 20480)
firstDoneN = runtime.Stack(firstDonebuf, false)
magik6k marked this conversation as resolved.
Show resolved Hide resolved

doneCalled = true
}

deferredDone := func() { done() }
defer func() {
st.localLk.Unlock()
snadrus marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -453,13 +500,13 @@ func (st *Local) Reserve(ctx context.Context, sid storiface.SectorRef, ft storif
return nil, errPathNotFound
}

stat, resvOnDisk, err := p.stat(st.localStorage, statExistingSectorForReservation{sid.ID, fileType})
overhead := int64(overheadTab[fileType]) * int64(ssize) / storiface.FSOverheadDen

stat, resvOnDisk, err := p.stat(st.localStorage, statExistingSectorForReservation{sid.ID, fileType, overhead})
if err != nil {
return nil, xerrors.Errorf("getting local storage stat: %w", err)
}

overhead := int64(overheadTab[fileType]) * int64(ssize) / storiface.FSOverheadDen

if overhead-resvOnDisk < 0 {
log.Errorw("negative overhead vs on-disk data", "overhead", overhead, "on-disk", resvOnDisk, "id", id, "sector", sid, "fileType", fileType)
resvOnDisk = overhead
Expand All @@ -469,22 +516,24 @@ func (st *Local) Reserve(ctx context.Context, sid storiface.SectorRef, ft storif
return nil, storiface.Err(storiface.ErrTempAllocateSpace, xerrors.Errorf("can't reserve %d bytes in '%s' (id:%s), only %d available", overhead, p.local, id, stat.Available))
}

resID := sectorFile{sid.ID, fileType}

log.Debugw("reserve add", "id", id, "sector", sid, "fileType", fileType, "overhead", overhead, "reserved-before", p.reserved, "reserved-after", p.reserved+overhead)

p.reserved += overhead
p.reservations[sid.ID] |= fileType
p.reservations[resID] = overhead

prevDone := done
saveFileType := fileType
done = func() {
prevDone()

st.localLk.Lock()
defer st.localLk.Unlock()

log.Debugw("reserve done", "id", id, "sector", sid, "fileType", fileType, "overhead", overhead, "reserved-before", p.reserved, "reserved-after", p.reserved-overhead)

p.reserved -= overhead
p.reservations[sid.ID] ^= saveFileType
if p.reservations[sid.ID] == storiface.FTNone {
delete(p.reservations, sid.ID)
}
delete(p.reservations, resID)
}
}

Expand Down