-
Notifications
You must be signed in to change notification settings - Fork 1
/
replicaset.go
311 lines (249 loc) · 9.08 KB
/
replicaset.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
package vshard_router //nolint:revive
import (
"context"
"fmt"
"math"
"time"
"github.com/google/uuid"
"github.com/tarantool/go-tarantool/v2"
"github.com/tarantool/go-tarantool/v2/pool"
"github.com/vmihailenco/msgpack/v5"
"github.com/vmihailenco/msgpack/v5/msgpcode"
)
type ReplicasetInfo struct {
Name string
UUID uuid.UUID
Weight float64
PinnedCount uint64
IgnoreDisbalance bool
}
func (rsi ReplicasetInfo) String() string {
return fmt.Sprintf("{name: %s, uuid: %s}", rsi.Name, rsi.UUID)
}
type ReplicasetCallOpts struct {
PoolMode pool.Mode
Timeout time.Duration
}
type Replicaset struct {
conn pool.Pooler
info ReplicasetInfo
EtalonBucketCount uint64
}
func (rs *Replicaset) String() string {
return rs.info.String()
}
func (rs *Replicaset) BucketStat(ctx context.Context, bucketID uint64) (BucketStatInfo, error) {
future := rs.bucketStatAsync(ctx, bucketID)
return bucketStatWait(future)
}
func (rs *Replicaset) bucketStatAsync(ctx context.Context, bucketID uint64) *tarantool.Future {
const bucketStatFnc = "vshard.storage.bucket_stat"
return rs.CallAsync(ctx, ReplicasetCallOpts{PoolMode: pool.RO}, bucketStatFnc, []interface{}{bucketID})
}
type vshardStorageBucketStatResponseProto struct {
ok bool
info BucketStatInfo
err StorageCallVShardError
}
func (r *vshardStorageBucketStatResponseProto) DecodeMsgpack(d *msgpack.Decoder) error {
// bucket_stat returns pair: stat, err
// https://github.com/tarantool/vshard/blob/e1c806e1d3d2ce8a4e6b4d498c09051bf34ab92a/vshard/storage/init.lua#L1413
respArrayLen, err := d.DecodeArrayLen()
if err != nil {
return err
}
if respArrayLen == 0 {
return fmt.Errorf("protocol violation bucketStatWait: empty response")
}
code, err := d.PeekCode()
if err != nil {
return err
}
if code == msgpcode.Nil {
err = d.DecodeNil()
if err != nil {
return err
}
if respArrayLen != 2 {
return fmt.Errorf("protocol violation bucketStatWait: length is %d on vshard error case", respArrayLen)
}
err = d.Decode(&r.err)
if err != nil {
return fmt.Errorf("failed to decode storage vshard error: %w", err)
}
return nil
}
err = d.Decode(&r.info)
if err != nil {
return fmt.Errorf("failed to decode bucket stat info: %w", err)
}
r.ok = true
return nil
}
func bucketStatWait(future *tarantool.Future) (BucketStatInfo, error) {
var bucketStatResponse vshardStorageBucketStatResponseProto
err := future.GetTyped(&bucketStatResponse)
if err != nil {
return BucketStatInfo{}, err
}
if !bucketStatResponse.ok {
return BucketStatInfo{}, bucketStatResponse.err
}
return bucketStatResponse.info, nil
}
// ReplicaCall perform function on remote storage
// link https://github.com/tarantool/vshard/blob/99ceaee014ea3a67424c2026545838e08d69b90c/vshard/replicaset.lua#L661
// Deprecated: ReplicaCall is deprecated,
// because looks like it has a little bit broken interface.
// See https://github.com/tarantool/go-vshard-router/issues/42.
// Use CallAsync instead.
func (rs *Replicaset) ReplicaCall(
ctx context.Context,
opts ReplicasetCallOpts,
fnc string,
args interface{},
) (interface{}, StorageResultTypedFunc, error) {
if opts.Timeout == 0 {
opts.Timeout = callTimeoutDefault
}
future := rs.CallAsync(ctx, opts, fnc, args)
respData, err := future.Get()
if err != nil {
return nil, nil, fmt.Errorf("got error on future.Get(): %w", err)
}
if len(respData) == 0 {
// Since this method returns the first element of respData by contract, we can't return anything is this case (broken interface)
return nil, nil, fmt.Errorf("%s response data is empty", fnc)
}
return respData[0], func(result ...interface{}) error {
return future.GetTyped(&result)
}, nil
}
// CallAsync sends async request to remote storage
func (rs *Replicaset) CallAsync(ctx context.Context, opts ReplicasetCallOpts, fnc string, args interface{}) *tarantool.Future {
if opts.Timeout > 0 {
// Don't set any timeout by default, parent context timeout would be inherited in this case.
// Don't call cancel in defer, because this we send request asynchronously,
// and wait for result outside from this function.
// suppress linter warning: lostcancel: the cancel function returned by context.WithTimeout should be called, not discarded, to avoid a context leak (govet)
//nolint:govet
ctx, _ = context.WithTimeout(ctx, opts.Timeout)
}
req := tarantool.NewCallRequest(fnc).
Context(ctx).
Args(args)
return rs.conn.Do(req, opts.PoolMode)
}
func (rs *Replicaset) bucketsDiscoveryAsync(ctx context.Context, from uint64) *tarantool.Future {
const bucketsDiscoveryFnc = "vshard.storage.buckets_discovery"
var bucketsDiscoveryPaginationRequest = struct {
From uint64 `msgpack:"from"`
}{From: from}
return rs.CallAsync(ctx, ReplicasetCallOpts{PoolMode: pool.PreferRO}, bucketsDiscoveryFnc,
[]interface{}{bucketsDiscoveryPaginationRequest})
}
type bucketsDiscoveryResp struct {
Buckets []uint64 `msgpack:"buckets"`
NextFrom uint64 `msgpack:"next_from"`
}
func bucketsDiscoveryWait(future *tarantool.Future) (bucketsDiscoveryResp, error) {
// We intentionally don't support old vshard storages that mentioned here:
// https://github.com/tarantool/vshard/blob/8d299bfecff8bc656056658350ad48c829f9ad3f/vshard/router/init.lua#L343
var resp bucketsDiscoveryResp
err := future.GetTyped(&[]interface{}{&resp})
if err != nil {
return resp, fmt.Errorf("future.GetTyped() failed: %v", err)
}
return resp, nil
}
func (rs *Replicaset) bucketsDiscovery(ctx context.Context, from uint64) (bucketsDiscoveryResp, error) {
future := rs.bucketsDiscoveryAsync(ctx, from)
return bucketsDiscoveryWait(future)
}
// CalculateEtalonBalance computes the ideal bucket count for each replicaset.
// This iterative algorithm seeks the optimal balance within a cluster by
// calculating the ideal bucket count for each replicaset at every step.
// If the ideal count cannot be achieved due to pinned buckets, the algorithm
// makes a best effort to approximate balance by ignoring the replicaset with
// pinned buckets and its associated pinned count. After each iteration, a new
// balance is recalculated. However, this can lead to scenarios where the
// conditions are still unmet; ignoring pinned buckets in overloaded
// replicasets can reduce the ideal bucket count in others, potentially
// causing new values to fall below their pinned count.
//
// At each iteration, the algorithm either concludes or disregards at least
// one new overloaded replicaset. Therefore, its time complexity is O(N^2),
// where N is the number of replicasets.
// based on https://github.com/tarantool/vshard/blob/master/vshard/replicaset.lua#L1358
func CalculateEtalonBalance(replicasets []Replicaset, bucketCount uint64) error {
isBalanceFound := false
weightSum := 0.0
stepCount := 0
replicasetCount := len(replicasets)
// Calculate total weight
for _, replicaset := range replicasets {
weightSum += replicaset.info.Weight
}
// Balance calculation loop
for !isBalanceFound {
stepCount++
if weightSum <= 0 {
return fmt.Errorf("weightSum should be greater than 0")
}
bucketPerWeight := float64(bucketCount) / weightSum
bucketsCalculated := uint64(0)
// Calculate etalon bucket count for each replicaset
for i := range replicasets {
if !replicasets[i].info.IgnoreDisbalance {
replicasets[i].EtalonBucketCount = uint64(math.Ceil(replicasets[i].info.Weight * bucketPerWeight))
bucketsCalculated += replicasets[i].EtalonBucketCount
}
}
bucketsRest := bucketsCalculated - bucketCount
isBalanceFound = true
// Spread disbalance and check for pinned buckets
for i := range replicasets {
if !replicasets[i].info.IgnoreDisbalance {
if bucketsRest > 0 {
n := replicasets[i].info.Weight * bucketPerWeight
ceil := math.Ceil(n)
floor := math.Floor(n)
if replicasets[i].EtalonBucketCount > 0 && ceil != floor {
replicasets[i].EtalonBucketCount--
bucketsRest--
}
}
// Handle pinned buckets
pinned := replicasets[i].info.PinnedCount
if pinned > 0 && replicasets[i].EtalonBucketCount < pinned {
isBalanceFound = false
bucketCount -= pinned
replicasets[i].EtalonBucketCount = pinned
replicasets[i].info.IgnoreDisbalance = true
weightSum -= replicasets[i].info.Weight
}
}
}
if bucketsRest != 0 {
return fmt.Errorf("bucketsRest should be 0")
}
// Safety check to prevent infinite loops
if stepCount > replicasetCount {
return fmt.Errorf("[PANIC]: the rebalancer is broken")
}
}
return nil
}
func (rs *Replicaset) BucketsCount(ctx context.Context) (uint64, error) {
const bucketCountFnc = "vshard.storage.buckets_count"
var bucketCount uint64
fut := rs.CallAsync(ctx, ReplicasetCallOpts{PoolMode: pool.ANY}, bucketCountFnc, nil)
err := fut.GetTyped(&[]interface{}{&bucketCount})
return bucketCount, err
}
func (rs *Replicaset) BucketForceCreate(ctx context.Context, firstBucketID, count uint64) error {
const bucketForceCreateFnc = "vshard.storage.bucket_force_create"
fut := rs.CallAsync(ctx, ReplicasetCallOpts{PoolMode: pool.RW}, bucketForceCreateFnc, []interface{}{firstBucketID, count})
_, err := fut.GetResponse()
return err
}