Skip to content

Commit

Permalink
[bybit-stream] add ChannelInactiveHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
rizer1980 committed Dec 24, 2024
1 parent 262d402 commit c410d69
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,50 +54,6 @@ BybitResult<BybitOrderDetails<BybitOrderDetail>> getBybitOrder(
return order;
}

// BybitResult<BybitOrderResponse> placeMarketOrder(
// BybitCategory category, String symbol, BybitSide side, BigDecimal qty, String orderLinkId)
// throws IOException {
// BybitPlaceOrderPayload payload =
// new BybitPlaceOrderPayload(category, symbol, side, MARKET, qty, orderLinkId);
// BybitResult<BybitOrderResponse> placeOrder =
// decorateApiCall(
// () -> bybitAuthenticated.placeMarketOrder(apiKey, signatureCreator, nonceFactory,
// payload))
// .withRateLimiter(getCreateOrderRateLimiter(category))
// .withRateLimiter(rateLimiter(GLOBAL_RATE_LIMITER))
// .call();
// if (!placeOrder.isSuccess()) {
// throw createBybitExceptionFromResult(placeOrder);
// }
// return placeOrder;
// }

//BybitResult<BybitOrderResponse> placeLimitOrder(
// BybitCategory category,
// String symbol,
// BybitSide side,
// BigDecimal qty,
// BigDecimal limitPrice,
// String orderLinkId,
// boolean reduceOnly)
// throws IOException {
// BybitPlaceOrderPayload payload =
// new BybitPlaceOrderPayload(
// category, symbol, side, BybitOrderType.LIMIT, qty, orderLinkId, limitPrice);
// payload.setReduceOnly(String.valueOf(reduceOnly));
// BybitResult<BybitOrderResponse> placeOrder =
// decorateApiCall(
// () -> bybitAuthenticated.placeLimitOrder(apiKey, signatureCreator, nonceFactory,
// payload))
// .withRateLimiter(getCreateOrderRateLimiter(category))
// .withRateLimiter(rateLimiter(GLOBAL_RATE_LIMITER))
// .call();
// if (!placeOrder.isSuccess()) {
// throw createBybitExceptionFromResult(placeOrder);
// }
// return placeOrder;
// }

BybitResult<BybitOrderResponse> amendOrder(
BybitCategory category,
String symbol,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import info.bitrich.xchangestream.core.ProductSubscription;
import info.bitrich.xchangestream.core.StreamingExchange;
import info.bitrich.xchangestream.service.netty.WebSocketClientHandler;
import io.reactivex.rxjava3.core.Completable;
import org.knowm.xchange.bybit.BybitExchange;
import org.knowm.xchange.bybit.dto.BybitCategory;
Expand Down Expand Up @@ -95,4 +96,13 @@ public BybitStreamingTradeService getStreamingTradeService() {
return streamingTradeService;
}

/**
* Enables the user to listen on channel inactive events and react appropriately.
*
* @param channelInactiveHandler a WebSocketMessageHandler instance.
*/
public void setChannelInactiveHandler(
WebSocketClientHandler.WebSocketMessageHandler channelInactiveHandler) {
streamingService.setChannelInactiveHandler(channelInactiveHandler);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import dto.BybitSubscribeMessage;
import info.bitrich.xchangestream.service.netty.JsonNettyStreamingService;
import info.bitrich.xchangestream.service.netty.WebSocketClientCompressionAllowClientNoContextAndServerNoContextHandler;
import info.bitrich.xchangestream.service.netty.WebSocketClientHandler;
import io.netty.handler.codec.http.websocketx.extensions.WebSocketClientExtensionHandler;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.CompletableSource;
Expand Down Expand Up @@ -41,6 +42,7 @@ public class BybitStreamingService extends JsonNettyStreamingService {
private final Observable<Long> pingPongSrc = Observable.interval(15, 20, TimeUnit.SECONDS);
private Disposable pingPongSubscription;
private final ExchangeSpecification spec;
private WebSocketClientHandler.WebSocketMessageHandler channelInactiveHandler = null;
@Getter private boolean isAuthorized = false;

public BybitStreamingService(String apiUrl, ExchangeSpecification spec) {
Expand Down Expand Up @@ -158,4 +160,9 @@ public void pingPongDisconnectIfConnected() {
protected WebSocketClientExtensionHandler getWebSocketClientExtensionHandler() {
return WebSocketClientCompressionAllowClientNoContextAndServerNoContextHandler.INSTANCE;
}

public void setChannelInactiveHandler(
WebSocketClientHandler.WebSocketMessageHandler channelInactiveHandler) {
this.channelInactiveHandler = channelInactiveHandler;
}
}

0 comments on commit c410d69

Please sign in to comment.