Skip to content
This repository has been archived by the owner on Dec 20, 2024. It is now read-only.

Commit

Permalink
Merge pull request #590 from lowzj/0.3-node
Browse files Browse the repository at this point in the history
feature: parse the port of supernode from --node flag of dfget
  • Loading branch information
starnop authored Jun 6, 2019
2 parents 780fb03 + 26e04fa commit 3f4f4d9
Show file tree
Hide file tree
Showing 10 changed files with 162 additions and 39 deletions.
19 changes: 19 additions & 0 deletions cmd/dfget/app/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ func runDfget() error {
util.Printer.Println(err.Error())
return err
}
if err := handleNodes(); err != nil {
util.Printer.Println(err.Error())
return err
}

checkParameters()
logrus.Infof("get cmd params:%q", os.Args)
Expand Down Expand Up @@ -266,6 +270,21 @@ func transFilter(filter string) []string {
return strings.Split(filter, "&")
}

func handleNodes() error {
nodes := make([]string, 0)

for _, v := range cfg.Node {
// TODO: check the validity of v.
if strings.IndexByte(v, ':') > 0 {
nodes = append(nodes, v)
continue
}
nodes = append(nodes, fmt.Sprintf("%s:%d", v, config.DefaultSupernodePort))
}
cfg.Node = nodes
return nil
}

func resultMsg(cfg *config.Config, end time.Time, e *errors.DFGetError) string {
if e != nil {
return fmt.Sprintf("download FAIL(%d) cost:%.3fs length:%d reason:%d error:%v",
Expand Down
2 changes: 2 additions & 0 deletions dfget/config/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ const (

DataExpireTime = 3 * time.Minute
ServerAliveTime = 5 * time.Minute

DefaultSupernodePort = 8002
)

/* errors code */
Expand Down
54 changes: 26 additions & 28 deletions dfget/core/api/supernode_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,39 +37,37 @@ const (
// NewSupernodeAPI creates a new instance of SupernodeAPI with default value.
func NewSupernodeAPI() SupernodeAPI {
return &supernodeAPI{
Scheme: "http",
ServicePort: 8002,
Timeout: 5 * time.Second,
HTTPClient: util.DefaultHTTPClient,
Scheme: "http",
Timeout: 5 * time.Second,
HTTPClient: util.DefaultHTTPClient,
}
}

// SupernodeAPI defines the communication methods between supernode and dfget.
type SupernodeAPI interface {
Register(ip string, req *types.RegisterRequest) (resp *types.RegisterResponse, e error)
PullPieceTask(ip string, req *types.PullPieceTaskRequest) (resp *types.PullPieceTaskResponse, e error)
ReportPiece(ip string, req *types.ReportPieceRequest) (resp *types.BaseResponse, e error)
ServiceDown(ip string, taskID string, cid string) (resp *types.BaseResponse, e error)
ReportClientError(ip string, req *types.ClientErrorRequest) (resp *types.BaseResponse, e error)
Register(node string, req *types.RegisterRequest) (resp *types.RegisterResponse, e error)
PullPieceTask(node string, req *types.PullPieceTaskRequest) (resp *types.PullPieceTaskResponse, e error)
ReportPiece(node string, req *types.ReportPieceRequest) (resp *types.BaseResponse, e error)
ServiceDown(node string, taskID string, cid string) (resp *types.BaseResponse, e error)
ReportClientError(node string, req *types.ClientErrorRequest) (resp *types.BaseResponse, e error)
}

type supernodeAPI struct {
Scheme string
ServicePort int
Timeout time.Duration
HTTPClient util.SimpleHTTPClient
Scheme string
Timeout time.Duration
HTTPClient util.SimpleHTTPClient
}

// Register sends a request to the supernode to register itself as a peer
// and create downloading task.
func (api *supernodeAPI) Register(ip string, req *types.RegisterRequest) (
func (api *supernodeAPI) Register(node string, req *types.RegisterRequest) (
resp *types.RegisterResponse, e error) {
var (
code int
body []byte
)
url := fmt.Sprintf("%s://%s:%d%s",
api.Scheme, ip, api.ServicePort, peerRegisterPath)
url := fmt.Sprintf("%s://%s%s",
api.Scheme, node, peerRegisterPath)
if code, body, e = api.HTTPClient.PostJSON(url, req, api.Timeout); e != nil {
return nil, e
}
Expand All @@ -83,47 +81,47 @@ func (api *supernodeAPI) Register(ip string, req *types.RegisterRequest) (

// PullPieceTask pull a piece downloading task from supernode, and get a
// response that describes from which peer to download.
func (api *supernodeAPI) PullPieceTask(ip string, req *types.PullPieceTaskRequest) (
func (api *supernodeAPI) PullPieceTask(node string, req *types.PullPieceTaskRequest) (
resp *types.PullPieceTaskResponse, e error) {

url := fmt.Sprintf("%s://%s:%d%s?%s",
api.Scheme, ip, api.ServicePort, peerPullPieceTaskPath, util.ParseQuery(req))
url := fmt.Sprintf("%s://%s%s?%s",
api.Scheme, node, peerPullPieceTaskPath, util.ParseQuery(req))

resp = new(types.PullPieceTaskResponse)
e = api.get(url, resp)
return
}

// ReportPiece reports the status of piece downloading task to supernode.
func (api *supernodeAPI) ReportPiece(ip string, req *types.ReportPieceRequest) (
func (api *supernodeAPI) ReportPiece(node string, req *types.ReportPieceRequest) (
resp *types.BaseResponse, e error) {

url := fmt.Sprintf("%s://%s:%d%s?%s",
api.Scheme, ip, api.ServicePort, peerReportPiecePath, util.ParseQuery(req))
url := fmt.Sprintf("%s://%s%s?%s",
api.Scheme, node, peerReportPiecePath, util.ParseQuery(req))

resp = new(types.BaseResponse)
e = api.get(url, resp)
return
}

// ServiceDown reports the status of the local peer to supernode.
func (api *supernodeAPI) ServiceDown(ip string, taskID string, cid string) (
func (api *supernodeAPI) ServiceDown(node string, taskID string, cid string) (
resp *types.BaseResponse, e error) {

url := fmt.Sprintf("%s://%s:%d%s?taskId=%s&cid=%s",
api.Scheme, ip, api.ServicePort, peerServiceDownPath, taskID, cid)
url := fmt.Sprintf("%s://%s%s?taskId=%s&cid=%s",
api.Scheme, node, peerServiceDownPath, taskID, cid)

resp = new(types.BaseResponse)
e = api.get(url, resp)
return
}

// ReportClientError reports the client error when downloading piece to supernode.
func (api *supernodeAPI) ReportClientError(ip string, req *types.ClientErrorRequest) (
func (api *supernodeAPI) ReportClientError(node string, req *types.ClientErrorRequest) (
resp *types.BaseResponse, e error) {

url := fmt.Sprintf("%s://%s:%d%s?%s",
api.Scheme, ip, api.ServicePort, peerClientErrorPath, util.ParseQuery(req))
url := fmt.Sprintf("%s://%s%s?%s",
api.Scheme, node, peerClientErrorPath, util.ParseQuery(req))

resp = new(types.BaseResponse)
e = api.get(url, resp)
Expand Down
11 changes: 3 additions & 8 deletions dfget/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"os"
"path"
"path/filepath"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -268,15 +267,11 @@ func adjustSupernodeList(nodes []string) []string {

func checkConnectSupernode(nodes []string) (localIP string) {
var (
e error
port = 8002
e error
)
for _, n := range nodes {
nodeFields := strings.Split(n, ":")
if len(nodeFields) == 2 {
port, _ = strconv.Atoi(nodeFields[1])
}
if localIP, e = util.CheckConnect(nodeFields[0], port, 1000); e == nil {
ip, port := util.GetIPAndPortFromNode(n, config.DefaultSupernodePort)
if localIP, e = util.CheckConnect(ip, port, 1000); e == nil {
return localIP
}
logrus.Errorf("Connect to node:%s error: %v", n, e)
Expand Down
2 changes: 1 addition & 1 deletion dfget/core/regist/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (s *supernodeRegister) Register(peerPort int) (*RegisterResult, *errors.DFG
nodes, nLen := s.cfg.Node, len(s.cfg.Node)
req := s.constructRegisterRequest(peerPort)
for i = 0; i < nLen; i++ {
req.SupernodeIP = nodes[i]
req.SupernodeIP = util.ExtractHost(nodes[i])
resp, e = s.api.Register(nodes[i], req)
logrus.Infof("do register to %s, res:%s error:%v", nodes[i], resp, e)
if e != nil {
Expand Down
51 changes: 51 additions & 0 deletions dfget/util/net_util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright The Dragonfly Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package util

import (
"strconv"
"strings"
)

// ExtractHost extracts host ip from the giving string.
func ExtractHost(hostAndPort string) string {
fields := strings.Split(strings.TrimSpace(hostAndPort), ":")
return fields[0]
}

// GetIPAndPortFromNode return ip and port by parsing the node value.
// It will return defaultPort as the value of port
// when the node is a string without port or with an illegal port.
func GetIPAndPortFromNode(node string, defaultPort int) (string, int) {
if IsEmptyStr(node) {
return "", defaultPort
}

nodeFields := strings.Split(node, ":")
switch len(nodeFields) {
case 1:
return nodeFields[0], defaultPort
case 2:
port, err := strconv.Atoi(nodeFields[1])
if err != nil {
return nodeFields[0], defaultPort
}
return nodeFields[0], port
default:
return "", defaultPort
}
}
51 changes: 51 additions & 0 deletions dfget/util/net_util_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright The Dragonfly Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package util

import (
"github.com/go-check/check"
)

type UtilSuite struct{}

func init() {
check.Suite(&UtilSuite{})
}

func (suite *UtilSuite) TestExtractHost(c *check.C) {
host := ExtractHost("1:0")
c.Assert(host, check.Equals, "1")
}

func (suite *UtilSuite) TestGetIPAndPortFromNode(c *check.C) {
var cases = []struct {
node string
defaultPort int
expectedIP string
expectedPort int
}{
{"127.0.0.1", 8002, "127.0.0.1", 8002},
{"127.0.0.1:8001", 8002, "127.0.0.1", 8001},
{"127.0.0.1:abcd", 8002, "127.0.0.1", 8002},
}

for _, v := range cases {
ip, port := GetIPAndPortFromNode(v.node, v.defaultPort)
c.Check(ip, check.Equals, v.expectedIP)
c.Check(port, check.Equals, v.expectedPort)
}
}
2 changes: 2 additions & 0 deletions test/cli_dfget_p2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/go-check/check"

"github.com/dragonflyoss/Dragonfly/test/command"
"github.com/dragonflyoss/Dragonfly/test/environment"
)

func init() {
Expand Down Expand Up @@ -52,6 +53,7 @@ func (s *DFGetP2PTestSuite) TestDownload(c *check.C) {
cmd, err := s.starter.DFGet(5*time.Second,
"-u", "https://lowzj.com",
"-o", Join(s.starter.Home, "a.test"),
"--node", fmt.Sprintf("127.0.0.1:%d", environment.SupernodeListenPort),
"--notbs")
cmd.Wait()

Expand Down
6 changes: 4 additions & 2 deletions test/command/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ import (
"os/exec"
fp "path/filepath"
"runtime"
"strconv"
"strings"
"sync"
"time"

"github.com/dragonflyoss/Dragonfly/dfget/util"
"github.com/dragonflyoss/Dragonfly/test/environment"
)

var (
Expand Down Expand Up @@ -129,14 +131,14 @@ func (s *Starter) Supernode(running time.Duration, args ...string) (
dir := fp.Join(s.Home, "supernode")
args = append([]string{
"-Dsupernode.baseHome=" + dir,
"-Dserver.port=8002",
"-Dserver.port=" + strconv.Itoa(environment.SupernodeListenPort),
"-jar", supernodePath,
}, args...)

if cmd, err = s.execCmd(running, "java", args...); err != nil {
return nil, err
}
if err = check("localhost", 8002, 5*time.Second); err != nil {
if err = check("localhost", environment.SupernodeListenPort, 5*time.Second); err != nil {
s.Kill(cmd)
return nil, err
}
Expand Down
3 changes: 3 additions & 0 deletions test/environment/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import (
)

var (
// SupernodeListenPort is the port that supernode will listen.
SupernodeListenPort = 8008

// DragonflySupernodeBinary is default binary
DragonflySupernodeBinary = "/usr/local/bin/supernode"

Expand Down

0 comments on commit 3f4f4d9

Please sign in to comment.