Skip to content

Commit

Permalink
支持多连接 & 客户端可以指定服务要监听的地址
Browse files Browse the repository at this point in the history
  • Loading branch information
editso committed Dec 2, 2021
1 parent 66cc4e3 commit 6ca5d7e
Show file tree
Hide file tree
Showing 11 changed files with 746 additions and 145 deletions.
12 changes: 9 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,26 @@ A fast, stable, cross-platform and efficient intranet penetration and port forwa
2. 确保服务端端口(`9003`)未被使用
3. 确保转发的端口(`80`)有服务在运行
4. 转发成功后需要访问的端口由服务端随机分配
5. 服务端出现 **Actual access address 0.0.0.0:xxxx**日志则代表转发服务已准备就绪
5. 服务端出现 **New mapping xxxx -> xxxx**日志则代表转发服务已准备就绪

4. 配置
`Fuso` 的所有配置都是通过参数传递的方式
打开终端运行 `[fus or fuc] --help` 即可获取帮助信息

5. 高级用法 `>1.0.2`
1. 支持从客户端指定服务端要监听的端口(*前提你所指定的端口没有被占用!*) 用法:
`fuc [--bind or -b] 端口`
2. 支持多连接


### 🤔Features
| Name | <font color="green">✔(Achieved)</font> / <font color="red">❌(Unrealized)</font>) |
| ---------------------- | -------------------------------------------------------------------------------- |
| 基本转发 (Forward) | <font color="green">✔</font> |
| 传输加密 (Encrypt) | <font color="green">✔</font> |
| Socks5代理 (Socks5) | <font color="green">✔</font> |
| Socks5代理 (Socks5) | <font color="green">✔</font> |
| UDP支持 (udp support) ||
| 多映射 | |
| 多映射 | <font color="green">✔</font> |
| 级联代理 ||
| 数据传输压缩 ||

Expand Down
10 changes: 9 additions & 1 deletion fuso-api/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ impl<T, Store> Clone for Rollback<T, Store>
where
T: Clone,
{
#[inline]
fn clone(&self) -> Self {
Self {
target: self.target.clone(),
Expand All @@ -375,6 +376,13 @@ impl Rollback<TcpStream, Buffer<u8>> {
}
}

impl From<Rollback<TcpStream, Buffer<u8>>> for TcpStream {
#[inline]
fn from(roll: Rollback<TcpStream, Buffer<u8>>) -> Self {
roll.target.lock().unwrap().clone()
}
}

#[async_trait]
impl<T> AsyncRead for Rollback<T, Buffer<u8>>
where
Expand Down Expand Up @@ -429,7 +437,7 @@ where
impl<T> AsyncWrite for Rollback<T, Buffer<u8>>
where
T: AsyncWrite + Unpin + Send + Sync + 'static,
{
{
#[inline]
fn poll_write(
self: std::pin::Pin<&mut Self>,
Expand Down
22 changes: 22 additions & 0 deletions fuso-api/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,32 @@ impl From<smol::channel::RecvError> for Error {
}
}

impl<T> From<smol::channel::SendError<T>> for Error
where
T: Into<String>,
{
fn from(e: smol::channel::SendError<T>) -> Self {
Self {
repr: Repr::IO(std::io::Error::new(
std::io::ErrorKind::Other,
e.to_string(),
)),
}
}
}

impl From<&str> for Error {
fn from(txt: &str) -> Self {
Self {
repr: Repr::Fuso(ErrorKind::Customer(txt.into())),
}
}
}

impl From<String> for Error {
fn from(txt: String) -> Self {
Self {
repr: Repr::Fuso(ErrorKind::Customer(txt.into())),
}
}
}
89 changes: 63 additions & 26 deletions fuso-core/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,57 +1,96 @@
use std::net::SocketAddr;

use bytes::Bytes;
use fuso_api::{async_trait, Error, FusoListener, FusoPacket, Packet, Result};
use fuso_api::{async_trait, Error, FusoListener, FusoPacket, Result, Spwan};

use futures::AsyncWriteExt;
use smol::{
channel::{unbounded, Receiver},
net::TcpStream,
};

use crate::cmd::{CMD_BIND, CMD_CREATE, CMD_PING};
use crate::packet::Action;
use crate::retain::Heartbeat;

#[allow(unused)]
#[derive(Debug)]
pub struct Reactor {
conv: u64,
action: Action,
addr: SocketAddr,
packet: Packet,
}

pub struct Fuso {
accept_ax: Receiver<Reactor>,
}

impl Fuso {
pub async fn bind(addr: SocketAddr) -> Result<Self> {
pub async fn bind(addr: SocketAddr, bind_port: u16) -> Result<Self> {
// let stream = addr.tcp_connect().await?;

let mut stream = TcpStream::connect(addr)
.await
.map_err(|e| Error::with_io(e))?
.guard(5000)
.map_err(|e| Error::with_io(e))?;

stream
.send(
Action::Bind({
if bind_port == 0 {
None
} else {
Some(format!("0.0.0.0:{}", bind_port).parse().unwrap())
}
})
.into(),
)
.await?;

stream.send(Packet::new(CMD_BIND, Bytes::new())).await?;
let action: Action = stream.recv().await?.try_into()?;

let mut stream = stream.guard(5000).await?;

let (accept_tx, accept_ax) = unbounded();

smol::spawn(async move {
loop {
match stream.recv().await {
Err(e) => {
log::warn!("[fuc] Server disconnect {}", e);
break;
}
Ok(packet) if packet.get_cmd() == CMD_PING => {}
Ok(packet) => {
if let Err(_) = accept_tx.send(Reactor { addr, packet }).await {
let _ = stream.close().await;
break;
};
match action {
Action::Accept(conv) => {
log::debug!("Service binding is successful {}", conv);

async move {
loop {
match stream.recv().await {
Err(e) => {
log::warn!("[fuc] Server disconnect {}", e);
break;
}
Ok(packet) => {
let action: Result<Action> = packet.try_into();
match action {
Ok(Action::Ping) => {}

Ok(action) => {
match accept_tx.send(Reactor { conv, action, addr }).await {
Err(_) => {
let _ = stream.close().await;
break;
}
_ => {}
};
}
Err(e) => {
log::debug!("{}", e);
}
}
}
}
}
}
.detach();
}
Action::Err(e) => {
log::error!("Server error message {}", e);
panic!()
}
})
.detach();
_ => {}
}

Ok(Self { accept_ax })
}
Expand All @@ -63,9 +102,7 @@ impl Reactor {
let mut stream = TcpStream::connect(self.addr)
.await
.map_err(|e| Error::with_io(e))?;

stream.send(Packet::new(CMD_CREATE, Bytes::new())).await?;

stream.send(Action::Connect(self.conv).into()).await?;
Ok(stream)
}
}
Expand Down
Loading

0 comments on commit 6ca5d7e

Please sign in to comment.