diff --git a/README.md b/README.md
index 891900c..4f9dde8 100644
--- a/README.md
+++ b/README.md
@@ -41,20 +41,26 @@ A fast, stable, cross-platform and efficient intranet penetration and port forwa
2. 确保服务端端口(`9003`)未被使用
3. 确保转发的端口(`80`)有服务在运行
4. 转发成功后需要访问的端口由服务端随机分配
- 5. 服务端出现 **Actual access address 0.0.0.0:xxxx**日志则代表转发服务已准备就绪
+ 5. 服务端出现 **New mapping xxxx -> xxxx**日志则代表转发服务已准备就绪
4. 配置
`Fuso` 的所有配置都是通过参数传递的方式
打开终端运行 `[fus or fuc] --help` 即可获取帮助信息
+5. 高级用法 `>1.0.2`
+ 1. 支持从客户端指定服务端要监听的端口(*前提你所指定的端口没有被占用!*) 用法:
+ `fuc [--bind or -b] 端口`
+ 2. 支持多连接
+
+
### 🤔Features
| Name | ✔(Achieved) / ❌(Unrealized)) |
| ---------------------- | -------------------------------------------------------------------------------- |
| 基本转发 (Forward) | ✔ |
| 传输加密 (Encrypt) | ✔ |
-| Socks5代理 (Socks5) | ✔ |
+| Socks5代理 (Socks5) | ✔ |
| UDP支持 (udp support) | ❌ |
-| 多映射 | ❌ |
+| 多映射 | ✔ |
| 级联代理 | ❌ |
| 数据传输压缩 | ❌ |
diff --git a/fuso-api/src/core.rs b/fuso-api/src/core.rs
index 38c2d32..23cbc20 100644
--- a/fuso-api/src/core.rs
+++ b/fuso-api/src/core.rs
@@ -354,6 +354,7 @@ impl Clone for Rollback
where
T: Clone,
{
+ #[inline]
fn clone(&self) -> Self {
Self {
target: self.target.clone(),
@@ -375,6 +376,13 @@ impl Rollback> {
}
}
+impl From>> for TcpStream {
+ #[inline]
+ fn from(roll: Rollback>) -> Self {
+ roll.target.lock().unwrap().clone()
+ }
+}
+
#[async_trait]
impl AsyncRead for Rollback>
where
@@ -429,7 +437,7 @@ where
impl AsyncWrite for Rollback>
where
T: AsyncWrite + Unpin + Send + Sync + 'static,
-{
+{
#[inline]
fn poll_write(
self: std::pin::Pin<&mut Self>,
diff --git a/fuso-api/src/error.rs b/fuso-api/src/error.rs
index 6e1fd73..24ec258 100644
--- a/fuso-api/src/error.rs
+++ b/fuso-api/src/error.rs
@@ -73,6 +73,20 @@ impl From for Error {
}
}
+impl From> for Error
+where
+ T: Into,
+{
+ fn from(e: smol::channel::SendError) -> Self {
+ Self {
+ repr: Repr::IO(std::io::Error::new(
+ std::io::ErrorKind::Other,
+ e.to_string(),
+ )),
+ }
+ }
+}
+
impl From<&str> for Error {
fn from(txt: &str) -> Self {
Self {
@@ -80,3 +94,11 @@ impl From<&str> for Error {
}
}
}
+
+impl From for Error {
+ fn from(txt: String) -> Self {
+ Self {
+ repr: Repr::Fuso(ErrorKind::Customer(txt.into())),
+ }
+ }
+}
diff --git a/fuso-core/src/client.rs b/fuso-core/src/client.rs
index 7157028..59db54d 100644
--- a/fuso-core/src/client.rs
+++ b/fuso-core/src/client.rs
@@ -1,7 +1,6 @@
use std::net::SocketAddr;
-use bytes::Bytes;
-use fuso_api::{async_trait, Error, FusoListener, FusoPacket, Packet, Result};
+use fuso_api::{async_trait, Error, FusoListener, FusoPacket, Result, Spwan};
use futures::AsyncWriteExt;
use smol::{
@@ -9,13 +8,15 @@ use smol::{
net::TcpStream,
};
-use crate::cmd::{CMD_BIND, CMD_CREATE, CMD_PING};
+use crate::packet::Action;
use crate::retain::Heartbeat;
#[allow(unused)]
+#[derive(Debug)]
pub struct Reactor {
+ conv: u64,
+ action: Action,
addr: SocketAddr,
- packet: Packet,
}
pub struct Fuso {
@@ -23,35 +24,73 @@ pub struct Fuso {
}
impl Fuso {
- pub async fn bind(addr: SocketAddr) -> Result {
+ pub async fn bind(addr: SocketAddr, bind_port: u16) -> Result {
+ // let stream = addr.tcp_connect().await?;
+
let mut stream = TcpStream::connect(addr)
.await
- .map_err(|e| Error::with_io(e))?
- .guard(5000)
+ .map_err(|e| Error::with_io(e))?;
+
+ stream
+ .send(
+ Action::Bind({
+ if bind_port == 0 {
+ None
+ } else {
+ Some(format!("0.0.0.0:{}", bind_port).parse().unwrap())
+ }
+ })
+ .into(),
+ )
.await?;
- stream.send(Packet::new(CMD_BIND, Bytes::new())).await?;
+ let action: Action = stream.recv().await?.try_into()?;
+
+ let mut stream = stream.guard(5000).await?;
let (accept_tx, accept_ax) = unbounded();
- smol::spawn(async move {
- loop {
- match stream.recv().await {
- Err(e) => {
- log::warn!("[fuc] Server disconnect {}", e);
- break;
- }
- Ok(packet) if packet.get_cmd() == CMD_PING => {}
- Ok(packet) => {
- if let Err(_) = accept_tx.send(Reactor { addr, packet }).await {
- let _ = stream.close().await;
- break;
- };
+ match action {
+ Action::Accept(conv) => {
+ log::debug!("Service binding is successful {}", conv);
+
+ async move {
+ loop {
+ match stream.recv().await {
+ Err(e) => {
+ log::warn!("[fuc] Server disconnect {}", e);
+ break;
+ }
+ Ok(packet) => {
+ let action: Result = packet.try_into();
+ match action {
+ Ok(Action::Ping) => {}
+
+ Ok(action) => {
+ match accept_tx.send(Reactor { conv, action, addr }).await {
+ Err(_) => {
+ let _ = stream.close().await;
+ break;
+ }
+ _ => {}
+ };
+ }
+ Err(e) => {
+ log::debug!("{}", e);
+ }
+ }
+ }
+ }
}
}
+ .detach();
+ }
+ Action::Err(e) => {
+ log::error!("Server error message {}", e);
+ panic!()
}
- })
- .detach();
+ _ => {}
+ }
Ok(Self { accept_ax })
}
@@ -63,9 +102,7 @@ impl Reactor {
let mut stream = TcpStream::connect(self.addr)
.await
.map_err(|e| Error::with_io(e))?;
-
- stream.send(Packet::new(CMD_CREATE, Bytes::new())).await?;
-
+ stream.send(Action::Connect(self.conv).into()).await?;
Ok(stream)
}
}
diff --git a/fuso-core/src/core.rs b/fuso-core/src/core.rs
index 5e65b50..1600dc3 100644
--- a/fuso-core/src/core.rs
+++ b/fuso-core/src/core.rs
@@ -1,71 +1,168 @@
-use std::{collections::HashMap, net::SocketAddr, sync::Arc};
+use std::{
+ collections::{HashMap, VecDeque},
+ net::SocketAddr,
+ sync::Arc,
+};
use async_trait::async_trait;
-use futures::{AsyncRead, AsyncWrite, TryStreamExt};
+use futures::{lock::Mutex, AsyncWriteExt, TryStreamExt};
use smol::{
channel::{unbounded, Receiver, Sender},
+ future::FutureExt,
lock::RwLock,
net::TcpStream,
};
-use fuso_api::{AsyncTcpSocketEx, Buffer, Result, Rollback, Spwan};
+use fuso_api::{AsyncTcpSocketEx, FusoPacket, Result, Spwan};
-use crate::dispatch::{Dispatch, Handler};
+use crate::retain::Heartbeat;
+use crate::{dispatch::DynHandler, packet::Action};
+use crate::{
+ dispatch::{StrategyEx, TcpStreamRollback},
+ retain::HeartGuard,
+};
-pub type SyncContext = Arc>>;
+use crate::{
+ dispatch::{Dispatch, Handler, State},
+ handler::ChainHandler,
+};
#[derive(Debug, Clone)]
-pub struct Config {
- pub bind_addr: SocketAddr,
+pub struct Config {
pub debug: bool,
- pub handler: Vec>,
+ pub bind_addr: SocketAddr,
}
-pub struct Context {
- pub config: Arc>,
- pub sessions: Arc>>>,
- pub accept_ax: Sender,
+#[allow(unused)]
+pub struct FusoStream {
+ pub from: IO,
+ pub to: IO,
}
-pub struct FusoStream {
- from: IO,
- to: IO,
+pub struct Channel {
+ pub conv: u64,
+ pub core: HeartGuard,
+ pub config: Arc,
+ pub strategys: Arc, Action>>>>>,
+ pub wait_queue: Arc>>,
+}
+
+#[derive(Clone)]
+pub struct Context {
+ pub alloc_conv: Arc>,
+ pub config: Arc,
+ pub handlers: Arc, ()>>>>>,
+ pub strategys: Arc, Action>>>>>,
+ pub sessions: Arc>>>,
+ pub accept_ax: Sender>,
}
pub struct Fuso {
accept_tx: Receiver,
}
-impl Fuso
-where
- IO: Send + Sync + 'static,
-{
- pub async fn bind(config: Config) -> Result
+pub struct FusoBuilder {
+ pub config: Option,
+ pub handelrs: Vec>>>,
+ pub strategys: Vec, Action>>>>,
+}
+
+impl FusoBuilder> {
+ #[inline]
+ pub fn with_config(mut self, config: Config) -> Self {
+ self.config = Some(config);
+ self
+ }
+
+ #[inline]
+ pub fn with_chain(mut self, with_chain: F) -> Self
where
- H: Handler>, SyncContext> + Send + Sync + 'static,
- T: AsyncRead + AsyncWrite + Send + 'static,
+ F: FnOnce(
+ ChainHandler, fuso_api::Result>>,
+ )
+ -> ChainHandler, fuso_api::Result>>,
{
- let config = Arc::new(config);
+ self.handelrs
+ .push(Arc::new(Box::new(with_chain(ChainHandler::new()))));
+ self
+ }
+
+ #[inline]
+ pub fn chain_strategy(mut self, with_chain: F) -> Self
+ where
+ F: FnOnce(
+ ChainHandler, fuso_api::Result>>,
+ )
+ -> ChainHandler, fuso_api::Result>>,
+ {
+ self.strategys
+ .push(Arc::new(Box::new(with_chain(ChainHandler::new()))));
+
+ self
+ }
+
+ #[inline]
+ pub fn chain_handler(mut self, handler: H) -> Self
+ where
+ H: Handler, ()> + Send + Sync + 'static,
+ {
+ self.handelrs.push(Arc::new(Box::new(handler)));
+ self
+ }
+
+ pub async fn build(self) -> fuso_api::Result>> {
+ let config = Arc::new(self.config.unwrap());
+
let (accept_ax, accept_tx) = unbounded();
let bind_addr = config.bind_addr.clone();
+ let listen = bind_addr.tcp_listen().await?;
+
+ let handlers = Arc::new(self.handelrs);
+ let strategys = Arc::new(self.strategys);
- let cx = Arc::new(RwLock::new(Context {
+ let cx = Arc::new(Context {
config,
accept_ax,
+ handlers,
+ strategys,
sessions: Arc::new(RwLock::new(HashMap::new())),
- }));
+ alloc_conv: Arc::new(Mutex::new(0)),
+ });
async move {
- let _ = bind_addr
- .tcp_listen()
- .await
- .unwrap()
+ log::info!("Service started successfully");
+ log::info!("Bound to {}", bind_addr);
+ log::info!("Waiting to connect");
+
+ let _ = listen
.incoming()
- .try_fold(cx, |cx, stream| async move {
+ .try_fold(cx, |cx, mut stream| async move {
log::debug!("[tcp] accept {}", stream.local_addr().unwrap());
- stream.dispatch(cx.clone()).detach();
+ {
+ let handlers = cx.handlers.clone();
+ let cx = cx.clone();
+ async move {
+ match stream.clone().dispatch(handlers, cx).await {
+ Ok(_) => {
+ log::debug!(
+ "[tcp] Successfully processed {}",
+ stream.local_addr().unwrap()
+ );
+ }
+ Err(_) => {
+ let _ = stream.close().await;
+
+ log::warn!(
+ "[tcp] An illegal connection {}",
+ stream.peer_addr().unwrap()
+ );
+ }
+ };
+ }
+ .detach();
+ }
Ok(cx)
})
@@ -73,10 +170,19 @@ where
}
.detach();
- Ok(Self { accept_tx })
+ Ok(Fuso { accept_tx })
}
}
+impl Fuso> {
+ pub fn builder() -> FusoBuilder> {
+ FusoBuilder {
+ config: None,
+ handelrs: Vec::new(),
+ strategys: Vec::new(),
+ }
+ }
+}
#[async_trait]
impl fuso_api::FusoListener for Fuso
@@ -94,3 +200,182 @@ where
Ok(())
}
}
+
+impl Channel {
+ #[inline]
+ pub async fn try_wake(&self) -> Result {
+ match self.wait_queue.lock().await.pop_front() {
+ Some(tcp) => Ok(tcp),
+ None => Err("No task operation required".into()),
+ }
+ }
+
+ #[inline]
+ pub async fn suspend(&self, tcp: TcpStream) -> Result<()> {
+ self.wait_queue.lock().await.push_back(tcp);
+ Ok(())
+ }
+}
+
+impl Context {
+ #[inline]
+ pub async fn fork(&self) -> (u64, Receiver) {
+ let (accept_tx, accept_ax) = unbounded();
+
+ let mut sessions = self.sessions.write().await;
+
+ let conv = loop {
+ let (conv, _) = self.alloc_conv.lock().await.overflowing_add(1);
+
+ match sessions.get(&conv) {
+ Some(accept_tx) if accept_tx.is_closed() => {
+ break conv;
+ }
+ None => break conv,
+ _ => {}
+ }
+
+ *self.alloc_conv.lock().await = conv;
+ };
+
+ sessions.insert(conv, accept_tx);
+
+ (conv, accept_ax)
+ }
+
+ #[inline]
+ pub async fn route(&self, conv: u64, tcp: TcpStream) -> Result<()> {
+ let sessions = self.sessions.read().await;
+
+ if let Some(accept_tx) = sessions.get(&conv) {
+ if let Err(e) = accept_tx.send(tcp).await {
+ Err(e.to_string().into())
+ } else {
+ Ok(())
+ }
+ } else {
+ Err(format!("Session does not exist {}", conv).into())
+ }
+ }
+
+ pub async fn spwan(&self, tcp: TcpStream, addr: Option) -> Result {
+ let (conv, accept_ax) = self.fork().await;
+ let accept_tx = self.accept_ax.clone();
+ let clinet_addr = tcp.local_addr().unwrap();
+ let mut core = tcp.guard(5000).await?;
+ let bind_addr = addr.unwrap_or("0.0.0.0:0".parse().unwrap());
+ let listen = bind_addr.tcp_listen().await?;
+ let strategys = self.strategys.clone();
+
+ let _ = core.send(Action::Accept(conv).into()).await?;
+
+ let channel = Arc::new(Channel {
+ conv,
+ core,
+ strategys,
+ config: self.config.clone(),
+ wait_queue: Arc::new(Mutex::new(VecDeque::new())),
+ });
+
+ log::info!(
+ "New mapping {} -> {}",
+ listen.local_addr().unwrap(),
+ clinet_addr
+ );
+
+ async move {
+ let accept_future = {
+ let channel = channel.clone();
+ async move {
+ loop {
+ let from = accept_ax.recv().await;
+
+ if from.is_err() {
+ log::warn!("An unavoidable error occurred {}", from.unwrap_err());
+ break;
+ }
+
+ let mut from = from.unwrap();
+ match channel.try_wake().await {
+ Ok(to) => {
+ if let Err(e) = accept_tx.send(FusoStream { from, to }).await {
+ log::error!("An unavoidable error occurred {}", &e);
+ break;
+ }
+ }
+ Err(e) => {
+ let _ = from.close().await;
+ log::warn!("{}", e);
+ }
+ }
+ }
+ }
+ };
+
+ let fuso_future = {
+ let mut io = channel.core.clone();
+ async move {
+ loop {
+ match io.recv().await {
+ Ok(packet) => {
+ log::trace!("Client message received {:?}", packet);
+ }
+ Err(_) => {
+ log::warn!("Client session was aborted");
+ break;
+ }
+ }
+ }
+ }
+ };
+
+ let future_listen = {
+ async move {
+ let _ = listen
+ .incoming()
+ .try_fold(channel, |channel, mut tcp| async move {
+ {
+ log::debug!("connected {}", tcp.peer_addr().unwrap());
+
+ let channel = channel.clone();
+ async move {
+ let mut core = channel.core.clone();
+
+ let action = {
+ tcp.clone()
+ .select(channel.strategys.clone(), channel.clone())
+ .await
+ };
+
+ match action {
+ Ok(action) => {
+ log::debug!("action {:?}", action);
+ let _ = core.send(action.into()).await;
+ let _ = channel.suspend(tcp).await;
+ }
+ Err(e) => {
+ let _ = tcp.close().await;
+ log::warn!(
+ "Unable to process connection {} {}",
+ e,
+ tcp.peer_addr().unwrap()
+ );
+ }
+ }
+ }
+ .detach();
+ }
+
+ Ok(channel)
+ })
+ .await;
+ }
+ };
+
+ accept_future.race(fuso_future.race(future_listen)).await
+ }
+ .detach();
+
+ Ok(conv)
+ }
+}
diff --git a/fuso-core/src/dispatch.rs b/fuso-core/src/dispatch.rs
index 25f6aeb..0051cbb 100644
--- a/fuso-core/src/dispatch.rs
+++ b/fuso-core/src/dispatch.rs
@@ -1,45 +1,96 @@
+use std::sync::Arc;
+
use async_trait::async_trait;
use fuso_api::{Buffer, Rollback, RollbackEx};
+use futures::AsyncWriteExt;
use smol::net::TcpStream;
-use crate::{core::SyncContext, split};
+use crate::packet::Action;
-pub enum State {
+#[derive(Debug, Clone, Copy)]
+pub enum State {
Next,
- Accept,
+ Accept(T),
+}
+
+#[async_trait]
+pub trait Handler {
+ async fn dispose(&self, o: D, c: C) -> fuso_api::Result>;
}
#[async_trait]
-pub trait Handler {
- async fn dispose(&self, o: D, c: C) -> fuso_api::Result;
+pub trait Dispatch {
+ async fn dispatch(self, h: H, cx: C) -> fuso_api::Result<()>;
}
#[async_trait]
-pub trait Dispatch {
- async fn dispatch(self, cx: C) -> fuso_api::Result<()>;
+pub trait StrategyEx {
+ async fn select(self, strategys: S, cx: C) -> fuso_api::Result;
}
+pub type TcpStreamRollback = Rollback>;
+
+pub type DynHandler = dyn Handler + Send + Sync + 'static;
#[async_trait]
-impl Dispatch> for TcpStream
+impl Dispatch>>>>, C> for TcpStream
where
- H: Handler>, SyncContext> + Send + Sync + 'static,
- IO: Send + Sync + 'static,
+ C: Clone + Send + Sync + 'static,
{
- async fn dispatch(self, cx: SyncContext) -> fuso_api::Result<()> {
- let (cx_1, cx_2) = split(cx);
- let cx_1 = cx_1.read().await;
- let handler = &cx_1.config.handler;
+ #[inline]
+ async fn dispatch(
+ self,
+ handlers: Arc>>>>,
+ cx: C,
+ ) -> fuso_api::Result<()> {
+ let mut io = self.roll();
+
+ io.begin().await?;
+
+ for handle in handlers.iter() {
+ let handle = handle.clone();
+
+ match handle.dispose(io.clone(), cx.clone()).await? {
+ State::Accept(()) => return Ok(()),
+ State::Next => {
+ io.back().await?;
+ log::debug!("[dispatch] Next handler, rollback");
+ }
+ }
+ }
+
+ log::warn!(
+ "Can't find a suitable processor {}",
+ io.peer_addr().unwrap()
+ );
+
+ let _ = io.close().await;
+
+ Err(fuso_api::ErrorKind::UnHandler.into())
+ }
+}
- let io = self.roll();
+
+#[async_trait]
+impl StrategyEx>>>>, C> for TcpStream
+where
+ C: Clone + Send + Sync + 'static,
+{
+ #[inline]
+ async fn select(
+ self,
+ strategys: Arc>>>>,
+ cx: C,
+ ) -> fuso_api::Result {
+ let mut io = self.roll();
io.begin().await?;
- for handle in handler.iter() {
+ for handle in strategys.iter() {
let handle = handle.clone();
- match handle.dispose(io.clone(), cx_2.clone()).await? {
- State::Accept => return Ok(()),
+ match handle.dispose(io.clone(), cx.clone()).await? {
+ State::Accept(action) => return Ok(action),
State::Next => {
io.back().await?;
log::debug!("[dispatch] Next handler, rollback");
@@ -47,6 +98,13 @@ where
}
}
+ log::warn!(
+ "Can't find a suitable processor {}",
+ io.peer_addr().unwrap()
+ );
+
+ let _ = io.close().await;
+
Err(fuso_api::ErrorKind::UnHandler.into())
}
}
diff --git a/fuso-core/src/handler.rs b/fuso-core/src/handler.rs
index 2c7a616..5af8755 100644
--- a/fuso-core/src/handler.rs
+++ b/fuso-core/src/handler.rs
@@ -1,29 +1,80 @@
-use fuso_api::{Buffer, Rollback};
+use std::{pin::Pin, sync::Arc};
-use crate::{
- core::SyncContext,
- dispatch::{Handler, State},
-};
use async_trait::async_trait;
-pub struct ForwardHandler {}
+use futures::Future;
+use crate::dispatch::State;
+
+pub type DynFuture = dyn Future