Skip to content

Commit

Permalink
all: feats & optimizes
Browse files Browse the repository at this point in the history
- feat(me): add new cfg param SpeedLoop
- chore(send): remove unnecessary err chk in write
- perf(listen): use mutex instead of bool checking
  • Loading branch information
fumiama committed Jul 11, 2024
1 parent d695f14 commit c0f31a7
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 46 deletions.
43 changes: 43 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
name: golang-ci

on: [push, pull_request]

jobs:

golang-ci:
name: CI
runs-on: ubuntu-latest
steps:
- name: Set up Go 1.x
uses: actions/setup-go@master
with:
go-version: ^1.20

- name: Check out code into the Go module directory
uses: actions/checkout@master

- name: Get dependencies
run: go mod tidy

- name: Build
run: go build -v ./...

- name: Test
run: go test $(go list ./...)

lint:
name: Lint
runs-on: ubuntu-latest
steps:
- name: Set up Go 1.x
uses: actions/setup-go@master
with:
go-version: ^1.20

- name: Check out code into the Go module directory
uses: actions/checkout@master

- name: golangci-lint
uses: golangci/golangci-lint-action@master
with:
version: latest
1 change: 1 addition & 0 deletions config/cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type Config struct {
PrivateKey string `yaml:"PrivateKey"`
EndPoint string `yaml:"EndPoint"`
MTU int64 `yaml:"MTU"`
SpeedLoop uint16 `yaml:"SpeedLoop"`
Mask uint64 `yaml:"Mask"` // Mask 是异或报文所用掩码, 必须保证各端统一
Peers []Peer `yaml:"Peers"`
}
Expand Down
36 changes: 20 additions & 16 deletions gold/link/listen.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net/netip"
"runtime"
"strconv"
"sync"
"sync/atomic"
"time"
"unsafe"
Expand All @@ -17,60 +18,63 @@ import (
"github.com/fumiama/WireGold/gold/head"
)

// 监听本机 endpoint
func (m *Me) listen() (conn *net.UDPConn, err error) {
conn, err = net.ListenUDP("udp", net.UDPAddrFromAddrPort(netip.MustParseAddrPort(m.myend.String())))
// 监听本机 UDP endpoint
func (m *Me) listenudp() (conn *net.UDPConn, err error) {
conn, err = net.ListenUDP("udp", net.UDPAddrFromAddrPort(netip.MustParseAddrPort(m.udpep.String())))
if err != nil {
return
}
m.myend = conn.LocalAddr()
logrus.Infoln("[listen] at", m.myend)
m.udpep = conn.LocalAddr()
logrus.Infoln("[listen] at", m.udpep)
go func() {
recvtotlcnt := 0
recvloopcnt := 0
recvtotlcnt := uint64(0)
recvloopcnt := uint16(0)
recvlooptime := time.Now().UnixMilli()
n := runtime.NumCPU()
if n > 64 {
n = 64 // 只用最多 64 核
}
logrus.Infoln("[listen] use cpu num:", n)
listenbuff := make([]byte, 65536*n)
hasntfinished := make([]bool, n)
hasntfinished := make([]sync.Mutex, n)
for i := 0; err == nil; i++ {
i %= n
for hasntfinished[i] {
time.Sleep(time.Millisecond)
for !hasntfinished[i].TryLock() {
i++
i %= n
if i == 0 { // looked up a full round
time.Sleep(time.Millisecond * 10)
}
}
logrus.Debugln("[listen] lock index", i)
lbf := listenbuff[i*65536 : (i+1)*65536]
n, addr, err := conn.ReadFromUDP(lbf)
if err != nil {
logrus.Warnln("[listen] read from udp err, reconnect:", err)
conn, err = net.ListenUDP("udp", net.UDPAddrFromAddrPort(netip.MustParseAddrPort(m.myend.String())))
conn, err = net.ListenUDP("udp", net.UDPAddrFromAddrPort(netip.MustParseAddrPort(m.udpep.String())))
if err != nil {
logrus.Errorln("[listen] reconnect udp err:", err)
return
}
hasntfinished[i].Unlock()
i--
continue
}
recvtotlcnt += n
recvtotlcnt += uint64(n)
recvloopcnt++
if recvloopcnt >= 4096 {
if recvloopcnt%m.speedloop == 0 {
now := time.Now().UnixMilli()
logrus.Infof("[listen] recv avg speed: %.2f KB/s", float64(recvtotlcnt)/float64(now-recvlooptime))
recvtotlcnt = 0
recvloopcnt = 0
recvlooptime = now
}
packet := m.wait(lbf[:n])
if packet == nil {
hasntfinished[i].Unlock()
i--
continue
}
hasntfinished[i] = true
go m.listenthread(packet, addr, i, func() { hasntfinished[i] = false })
go m.listenthread(packet, addr, i, hasntfinished[i].Unlock)
}
}()
return
Expand Down
30 changes: 17 additions & 13 deletions gold/link/me.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,16 @@ type Me struct {
me net.IP
// 本机子网
subnet net.IPNet
// 本机 endpoint
myend net.Addr
// 本机 UDP endpoint
udpep net.Addr
// 本机环回 link
loop *Link
// 本机活跃的所有连接
connections map[string]*Link
// 读写同步锁
connmapmu sync.RWMutex
// 本机监听的 endpoint
myep *net.UDPConn
// 本机监听的 udp 连接, 用于向对端直接发送报文
udpconn *net.UDPConn
// 本机网卡
nic lower.NICIO
// 本机路由表
Expand All @@ -46,25 +46,25 @@ type Me struct {
// 抗重放攻击记录池
recved *ttl.Cache[uint64, bool]
// 本机上层配置
srcport, dstport, mtu uint16
srcport, dstport, mtu, speedloop uint16
// 报头掩码
mask uint64
}

type MyConfig struct {
MyIPwithMask string
MyEndpoint string
PrivateKey *[32]byte
NIC lower.NICIO
SrcPort, DstPort, MTU uint16
Mask uint64
MyIPwithMask string
MyEndpoint string
PrivateKey *[32]byte
NIC lower.NICIO
SrcPort, DstPort, MTU, SpeedLoop uint16
Mask uint64
}

// NewMe 设置本机参数
func NewMe(cfg *MyConfig) (m Me) {
m.privKey = *cfg.PrivateKey
var err error
m.myend, err = net.ResolveUDPAddr("udp", cfg.MyEndpoint)
m.udpep, err = net.ResolveUDPAddr("udp", cfg.MyEndpoint)
if err != nil {
panic(err)
}
Expand All @@ -74,7 +74,7 @@ func NewMe(cfg *MyConfig) (m Me) {
}
m.me = ip
m.subnet = *cidr
m.myep, err = m.listen()
m.udpconn, err = m.listenudp()
if err != nil {
panic(err)
}
Expand All @@ -96,6 +96,10 @@ func NewMe(cfg *MyConfig) (m Me) {
m.srcport = cfg.SrcPort
m.dstport = cfg.DstPort
m.mtu = cfg.MTU & 0xfff8
m.speedloop = cfg.SpeedLoop
if m.speedloop == 0 {
m.speedloop = 4096
}
m.mask = cfg.Mask
var buf [8]byte
binary.BigEndian.PutUint64(buf[:], m.mask)
Expand Down
32 changes: 15 additions & 17 deletions gold/link/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,23 +98,21 @@ func (l *Link) write(p *head.Packet, teatype uint8, additional uint16, datasz ui
if d == nil {
return 0, errors.New("[send] ttl exceeded")
}
if err == nil {
peerep := l.endpoint
if peerep == nil {
return 0, errors.New("[send] nil endpoint of " + p.Dst.String())
}
bound := 64
endl := "..."
if len(d) < bound {
bound = len(d)
endl = "."
}
logrus.Debugln("[send] write", len(d), "bytes data from ep", l.me.myep.LocalAddr(), "to", peerep, "offset:", fmt.Sprintf("%04x", offset))
logrus.Debugln("[send] data bytes", hex.EncodeToString(d[:bound]), endl)
d = l.me.xorenc(d)
logrus.Debugln("[send] data xored", hex.EncodeToString(d[:bound]), endl)
n, err = l.me.myep.WriteToUDP(d, peerep)
cl()
peerep := l.endpoint
if peerep == nil {
return 0, errors.New("[send] nil endpoint of " + p.Dst.String())
}
bound := 64
endl := "..."
if len(d) < bound {
bound = len(d)
endl = "."
}
logrus.Debugln("[send] write", len(d), "bytes data from ep", l.me.udpconn.LocalAddr(), "to", peerep, "offset:", fmt.Sprintf("%04x", offset))
logrus.Debugln("[send] data bytes", hex.EncodeToString(d[:bound]), endl)
d = l.me.xorenc(d)
logrus.Debugln("[send] data xored", hex.EncodeToString(d[:bound]), endl)
n, err = l.me.udpconn.WriteToUDP(d, peerep)
cl()
return
}
1 change: 1 addition & 0 deletions upper/services/wg/wg.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ func (wg *WG) init(srcport, dstport uint16) {
SrcPort: srcport,
DstPort: dstport,
MTU: uint16(wg.c.MTU),
SpeedLoop: wg.c.SpeedLoop,
Mask: wg.c.Mask,
})

Expand Down

0 comments on commit c0f31a7

Please sign in to comment.