Skip to content

Commit

Permalink
Merge remote-tracking branch 'github/develop' into bybit-v5
Browse files Browse the repository at this point in the history
  • Loading branch information
bigscoop committed Nov 13, 2023
2 parents 221f6c5 + d56e61f commit 0a53ff0
Show file tree
Hide file tree
Showing 11 changed files with 142 additions and 39 deletions.
2 changes: 1 addition & 1 deletion xchange-binance/src/test/resources/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
</appender>


<root level="DEBUG">
<root level="WARN">
<appender-ref ref="CONSOLE_APPENDER"/>
</root>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
/** @author Jean-Christophe Laruelle */
public class KrakenFuturesAdapters {

private static final String MULTI_COLLATERAL_PRODUCTS = "pf_";
private static final String MULTI_COLLATERAL_PRODUCTS = "PF_";
private static final String ACCOUNT_TYPE = "multiCollateralMarginAccount";

public static Ticker adaptTicker(
Expand Down Expand Up @@ -183,7 +183,7 @@ public static ExchangeMetaData adaptInstrumentsMetaData(KrakenFuturesInstruments
Map<Currency, CurrencyMetaData> currencies = new HashMap<>();

for (KrakenFuturesInstrument instrument : krakenFuturesInstruments.getInstruments()) {
if(instrument.getSymbol().contains("pf")){
if(instrument.getSymbol().contains(MULTI_COLLATERAL_PRODUCTS)){
instruments.put(adaptInstrument(instrument.getSymbol()),new InstrumentMetaData.Builder()
.volumeScale(instrument.getVolumeScale())
.priceScale(instrument.getTickSize().scale())
Expand All @@ -198,7 +198,7 @@ public static ExchangeMetaData adaptInstrumentsMetaData(KrakenFuturesInstruments

public static Instrument adaptInstrument(String symbol) {
String main_symbol = symbol.replace(MULTI_COLLATERAL_PRODUCTS,"");
return new FuturesContract(new CurrencyPair(main_symbol.substring(0, main_symbol.length() - 3).replace("xbt","btc")+"/"+main_symbol.substring(main_symbol.length()-3)),"PERP");
return new FuturesContract(new CurrencyPair(main_symbol.substring(0, main_symbol.length() - 3).replace("XBT","BTC")+"/"+main_symbol.substring(main_symbol.length()-3)),"PERP");
}

private static BigDecimal getMinimumAmountFromVolumeScale(Integer volumeScale){
Expand All @@ -215,7 +215,7 @@ private static BigDecimal getMinimumAmountFromVolumeScale(Integer volumeScale){
}

public static String adaptKrakenFuturesSymbol(Instrument instrument) {
return MULTI_COLLATERAL_PRODUCTS+instrument.getBase().toString().replace("BTC","XBT").toLowerCase()+instrument.getCounter().toString().toLowerCase();
return MULTI_COLLATERAL_PRODUCTS+instrument.getBase().toString().replace("BTC","XBT")+instrument.getCounter().toString();
}

public static Trades adaptTrades(KrakenFuturesPublicFills krakenFuturesTrades, Instrument instrument) {
Expand Down
2 changes: 1 addition & 1 deletion xchange-okex/src/test/resources/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
</appender>


<root level="DEBUG">
<root level="WARN">
<appender-ref ref="CONSOLE_APPENDER"/>
</root>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
package info.bitrich.xchangestream.binance;

import static java.util.Collections.emptyMap;

import info.bitrich.xchangestream.binance.BinanceUserDataChannel.NoActiveChannelException;
import info.bitrich.xchangestream.core.ProductSubscription;
import info.bitrich.xchangestream.core.StreamingExchange;
import info.bitrich.xchangestream.service.netty.ConnectionStateModel.State;
import info.bitrich.xchangestream.service.netty.WebSocketClientHandler;
import info.bitrich.xchangestream.util.Events;
import io.reactivex.Completable;
import io.reactivex.Observable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.knowm.xchange.binance.BinanceAuthenticated;
import org.knowm.xchange.binance.BinanceExchange;
Expand All @@ -17,14 +25,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static java.util.Collections.emptyMap;

public class BinanceStreamingExchange extends BinanceExchange implements StreamingExchange {

private static final Logger LOG = LoggerFactory.getLogger(BinanceStreamingExchange.class);
Expand Down Expand Up @@ -322,4 +322,14 @@ public void enableLiveSubscription() {
public void disableLiveSubscription() {
if (this.streamingService != null) this.streamingService.disableLiveSubscription();
}

/**
* Enables the user to listen on channel inactive events and react appropriately.
*
* @param channelInactiveHandler a WebSocketMessageHandler instance.
*/
public void setChannelInactiveHandler(
WebSocketClientHandler.WebSocketMessageHandler channelInactiveHandler) {
streamingService.setChannelInactiveHandler(channelInactiveHandler);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
import info.bitrich.xchangestream.core.ProductSubscription;
import info.bitrich.xchangestream.service.netty.JsonNettyStreamingService;
import info.bitrich.xchangestream.service.netty.WebSocketClientCompressionAllowClientNoContextAndServerNoContextHandler;
import info.bitrich.xchangestream.service.netty.WebSocketClientHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import io.netty.handler.codec.http.websocketx.extensions.WebSocketClientExtensionHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
Expand All @@ -20,6 +20,8 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BinanceStreamingService extends JsonNettyStreamingService {

Expand All @@ -32,6 +34,9 @@ public class BinanceStreamingService extends JsonNettyStreamingService {
private final KlineSubscription klineSubscription;

private boolean isLiveSubscriptionEnabled = false;

private WebSocketClientHandler.WebSocketMessageHandler channelInactiveHandler = null;

private final Map<Integer, BinanceWebSocketSubscriptionMessage> liveSubscriptionMessage =
new ConcurrentHashMap<>();

Expand Down Expand Up @@ -236,4 +241,39 @@ public void unsubscribeChannel(final String channelId) {
}
}
}

@Override
protected WebSocketClientHandler getWebSocketClientHandler(
WebSocketClientHandshaker handshake, WebSocketClientHandler.WebSocketMessageHandler handler) {
LOGGER.info("Registering BinanceWebSocketClientHandler");
return new BinanceWebSocketClientHandler(handshake, handler);
}

public void setChannelInactiveHandler(
WebSocketClientHandler.WebSocketMessageHandler channelInactiveHandler) {
this.channelInactiveHandler = channelInactiveHandler;
}

/**
* Custom client handler in order to execute an external, user-provided handler on channel events.
*/
class BinanceWebSocketClientHandler extends NettyWebSocketClientHandler {

public BinanceWebSocketClientHandler(
WebSocketClientHandshaker handshake, WebSocketMessageHandler handler) {
super(handshake, handler);
}

@Override
public void channelActive(ChannelHandlerContext ctx) {
super.channelActive(ctx);
}

@Override
public void channelInactive(ChannelHandlerContext ctx) {
super.channelInactive(ctx);
if (channelInactiveHandler != null)
channelInactiveHandler.onMessage("WebSocket Client disconnected!");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ public static OrderBook adaptKrakenFuturesSnapshot(KrakenFuturesStreamingOrderBo
List<LimitOrder> asks = new ArrayList<>();
List<LimitOrder> bids = new ArrayList<>();

snapshot.getBids().forEach(krakenFuturesSnapShotOrder -> bids.add(new LimitOrder.Builder(Order.OrderType.BID, KrakenFuturesAdapters.adaptInstrument(snapshot.getProduct_id().toLowerCase()))
snapshot.getBids().forEach(krakenFuturesSnapShotOrder -> bids.add(new LimitOrder.Builder(Order.OrderType.BID, KrakenFuturesAdapters.adaptInstrument(snapshot.getProduct_id()))
.limitPrice(krakenFuturesSnapShotOrder.getPrice())
.originalAmount(krakenFuturesSnapShotOrder.getQuantity())
.build()));
snapshot.getAsks().forEach(krakenFuturesSnapShotOrder -> asks.add(new LimitOrder.Builder(Order.OrderType.ASK, KrakenFuturesAdapters.adaptInstrument(snapshot.getProduct_id().toLowerCase()))
snapshot.getAsks().forEach(krakenFuturesSnapShotOrder -> asks.add(new LimitOrder.Builder(Order.OrderType.ASK, KrakenFuturesAdapters.adaptInstrument(snapshot.getProduct_id()))
.limitPrice(krakenFuturesSnapShotOrder.getPrice())
.originalAmount(krakenFuturesSnapShotOrder.getQuantity())
.build()));
Expand All @@ -35,7 +35,7 @@ public static OrderBook adaptKrakenFuturesSnapshot(KrakenFuturesStreamingOrderBo

public static Ticker adaptTicker(KrakenFuturesStreamingTickerResponse tickerResponse) {
return new Ticker.Builder()
.instrument(KrakenFuturesAdapters.adaptInstrument(tickerResponse.getProduct_id().toLowerCase()))
.instrument(KrakenFuturesAdapters.adaptInstrument(tickerResponse.getProduct_id()))
.ask(tickerResponse.getAsk())
.bid(tickerResponse.getBid())
.last(tickerResponse.getLast())
Expand All @@ -50,7 +50,7 @@ public static Ticker adaptTicker(KrakenFuturesStreamingTickerResponse tickerResp

public static FundingRate adaptFundingRate(KrakenFuturesStreamingTickerResponse tickerResponse) {
return new FundingRate.Builder()
.instrument(KrakenFuturesAdapters.adaptInstrument(tickerResponse.getProduct_id().toLowerCase()))
.instrument(KrakenFuturesAdapters.adaptInstrument(tickerResponse.getProduct_id()))
.fundingRate1h(tickerResponse.getRelative_funding_rate())
.fundingRate8h((tickerResponse.getRelative_funding_rate() == null)
? null
Expand All @@ -62,7 +62,7 @@ public static FundingRate adaptFundingRate(KrakenFuturesStreamingTickerResponse
public static Trade adaptTrade(KrakenFuturesStreamingTradeResponse trade) {
return new Trade.Builder()
.price(trade.getPrice())
.instrument(KrakenFuturesAdapters.adaptInstrument(trade.getProduct_id().toLowerCase()))
.instrument(KrakenFuturesAdapters.adaptInstrument(trade.getProduct_id()))
.timestamp(trade.getTime())
.type((trade.getSide().equals(KrakenFuturesStreamingOrderBookDeltaResponse.KrakenFuturesStreamingSide.sell) ? Order.OrderType.ASK : Order.OrderType.BID))
.id(trade.getUid())
Expand All @@ -82,7 +82,7 @@ public static List<UserTrade> adaptUserTrades(KrakenFuturesStreamingFillsDeltaRe
.feeCurrency(new Currency(krakenFuturesStreamingFill.getFee_currency()))
.feeAmount(krakenFuturesStreamingFill.getFee_paid())
.type((krakenFuturesStreamingFill.isBuy()) ? Order.OrderType.BID : Order.OrderType.ASK)
.instrument(KrakenFuturesAdapters.adaptInstrument(krakenFuturesStreamingFill.getInstrument().toLowerCase()))
.instrument(KrakenFuturesAdapters.adaptInstrument(krakenFuturesStreamingFill.getInstrument()))
.timestamp(krakenFuturesStreamingFill.getTime())
.build()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public Observable<Ticker> getTicker(Instrument instrument, Object... args) {

return service.subscribeChannel(channelName)
.filter(message-> message.has("feed") && message.has("product_id"))
.filter(message-> message.get("product_id").asText().toLowerCase().equals(KrakenFuturesAdapters.adaptKrakenFuturesSymbol(instrument)))
.filter(message-> message.get("product_id").asText().equals(KrakenFuturesAdapters.adaptKrakenFuturesSymbol(instrument)))
.map(message-> KrakenFuturesStreamingAdapters.adaptTicker(objectMapper.treeToValue(message, KrakenFuturesStreamingTickerResponse.class)));
}

Expand All @@ -83,7 +83,7 @@ public Observable<FundingRate> getFundingRate(Instrument instrument, Object... a

return service.subscribeChannel(channelName)
.filter(message-> message.has("feed") && message.has("product_id"))
.filter(message-> message.get("product_id").asText().toLowerCase().equals(KrakenFuturesAdapters.adaptKrakenFuturesSymbol(instrument)))
.filter(message-> message.get("product_id").asText().equals(KrakenFuturesAdapters.adaptKrakenFuturesSymbol(instrument)))
.map(message-> KrakenFuturesStreamingAdapters.adaptFundingRate(objectMapper.treeToValue(message, KrakenFuturesStreamingTickerResponse.class)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ protected String getChannelNameFromMessage(JsonNode message) {

if(message.has("feed") && message.has("product_id")){
if(message.get("feed").asText().contains(ORDERBOOK)){
channelName = ORDERBOOK+message.get("product_id").asText().toLowerCase();
channelName = ORDERBOOK+message.get("product_id").asText();
} else if(message.get("feed").asText().contains(TICKER)){
channelName = TICKER+message.get("product_id").asText().toLowerCase();
channelName = TICKER+message.get("product_id").asText();
} else if(message.get("feed").asText().contains(TRADES)){
channelName = TRADES+message.get("product_id").asText().toLowerCase();
channelName = TRADES+message.get("product_id").asText();
}
}
// Fills
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
</appender>


<root level="DEBUG">
<root level="WARN">
<appender-ref ref="CONSOLE_APPENDER"/>
</root>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import info.bitrich.xchangestream.core.StreamingExchange;
import info.bitrich.xchangestream.core.StreamingMarketDataService;
import info.bitrich.xchangestream.core.StreamingTradeService;
import info.bitrich.xchangestream.service.netty.WebSocketClientHandler;
import io.reactivex.Completable;
import org.knowm.xchange.ExchangeSpecification;
import org.knowm.xchange.exceptions.NotYetImplementedForExchangeException;
Expand Down Expand Up @@ -82,4 +83,14 @@ public StreamingTradeService getStreamingTradeService() {
public void useCompressedMessages(boolean compressedMessages) {
throw new NotYetImplementedForExchangeException("useCompressedMessage");
}

/**
* Enables the user to listen on channel inactive events and react appropriately.
*
* @param channelInactiveHandler a WebSocketMessageHandler instance.
*/
public void setChannelInactiveHandler(
WebSocketClientHandler.WebSocketMessageHandler channelInactiveHandler) {
streamingService.setChannelInactiveHandler(channelInactiveHandler);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,13 @@
import info.bitrich.xchangestream.okex.dto.OkexLoginMessage;
import info.bitrich.xchangestream.okex.dto.OkexSubscribeMessage;
import info.bitrich.xchangestream.service.netty.JsonNettyStreamingService;
import info.bitrich.xchangestream.service.netty.WebSocketClientHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import io.reactivex.Completable;
import io.reactivex.CompletableSource;
import io.reactivex.Observable;
import io.reactivex.disposables.Disposable;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.util.Base64;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import javax.crypto.Mac;
import javax.crypto.SecretKey;
import javax.crypto.spec.SecretKeySpec;
import org.knowm.xchange.ExchangeSpecification;
import org.knowm.xchange.exceptions.ExchangeException;
import org.knowm.xchange.exceptions.NotYetImplementedForExchangeException;
Expand All @@ -27,6 +20,17 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.crypto.Mac;
import javax.crypto.SecretKey;
import javax.crypto.spec.SecretKeySpec;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.util.Base64;
import java.util.Collections;
import java.util.concurrent.TimeUnit;

public class OkexStreamingService extends JsonNettyStreamingService {

private static final Logger LOG = LoggerFactory.getLogger(OkexStreamingService.class);
Expand All @@ -45,6 +49,8 @@ public class OkexStreamingService extends JsonNettyStreamingService {

private final Observable<Long> pingPongSrc = Observable.interval(15, 15, TimeUnit.SECONDS);

private WebSocketClientHandler.WebSocketMessageHandler channelInactiveHandler = null;

private Disposable pingPongSubscription;

private final ExchangeSpecification xSpec;
Expand Down Expand Up @@ -165,6 +171,42 @@ private OkexSubscribeMessage.SubscriptionTopic getTopic(String channelName){
}
}

@Override
protected WebSocketClientHandler getWebSocketClientHandler(
WebSocketClientHandshaker handshake, WebSocketClientHandler.WebSocketMessageHandler handler) {
LOG.info("Registering OkxWebSocketClientHandler");
return new OkxWebSocketClientHandler(handshake, handler);
}

public void setChannelInactiveHandler(
WebSocketClientHandler.WebSocketMessageHandler channelInactiveHandler) {
this.channelInactiveHandler = channelInactiveHandler;
}

/**
* Custom client handler in order to execute an external, user-provided handler on channel events.
*/
class OkxWebSocketClientHandler extends NettyWebSocketClientHandler {

public OkxWebSocketClientHandler(
WebSocketClientHandshaker handshake, WebSocketMessageHandler handler) {
super(handshake, handler);
}

@Override
public void channelActive(ChannelHandlerContext ctx) {
super.channelActive(ctx);
}

@Override
public void channelInactive(ChannelHandlerContext ctx) {
super.channelInactive(ctx);
if (channelInactiveHandler != null) {
channelInactiveHandler.onMessage("WebSocket Client disconnected!");
}
}
}

public void pingPongDisconnectIfConnected() {
if (pingPongSubscription != null && !pingPongSubscription.isDisposed()) {
pingPongSubscription.dispose();
Expand Down

0 comments on commit 0a53ff0

Please sign in to comment.