Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add LunarCrush feature source #60

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
11 changes: 11 additions & 0 deletions feature_sources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,16 @@
KrakenKlineFeatureSource,
KrakenKlineField,
)
from .lunarcrush_time_series_feature_source import (
LunarCrushMetric,
LunarCrushTimeSeriesTopic,
LunarCrushTimeSeriesCategory,
LunarCrushTimeSeriesCoin,
LunarCrushTimeSeriesStock,
)
from .temporal_feature_source import TemporalFeatureSource
from .test_feature_source import TestFeatureSource
from .yahoo_finance_kline_feature_source import (
YahooFinanceKlineField,
YahooFinanceKlineFeatureSource,
)
55 changes: 34 additions & 21 deletions feature_sources/binance_kline_feature_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import statistics
import time
from time_util import current_interval_ms, time_span_ms
from urllib.parse import urlencode


class BinanceKlineField(IntEnum):
Expand All @@ -36,7 +37,8 @@ class BinanceKlineFeatureSource(FeatureSource):

_QUERY_LIMIT = 1000

# Must be in ascending order
_URL = "https://api.binance.com/api/v3/klines"

_INTERVALS = {
time_span_ms(minutes=1): "1m",
time_span_ms(minutes=3): "3m",
Expand Down Expand Up @@ -94,11 +96,16 @@ def __init__(
self.VALID_FEATURE_IDS = feature_ids
super().__init__(feature_ids, feature_dtypes, default_dtype)

self._symbol = symbol
query_parameters = {
"symbol": symbol,
"interval": query_interval,
"limit": self._QUERY_LIMIT,
}

self._source_interval_ms = source_interval_ms
self._query_interval = query_interval
self._feature_mappings = feature_mappings
self._fields = list(feature_mappings.values())
self._query_parameters = query_parameters
self._retries = retries
self._logger = getLogger(self.__class__.__name__)

Expand All @@ -112,10 +119,10 @@ def _convert_samples(self, data_rows: list[list]) -> None:
for row in data_rows:
self._convert_sample(row)

# TODO: Examine moving this functionality into the FeatureSource base class
# TODO: Examine moving this into FeatureSource base class
def _compact_samples(self, samples: list[list]) -> list:
result = samples[-1].copy()
for field in BinanceKlineField:
for field in self._fields:
compaction = self._FIELD_COMPACTIONS.get(field, FeatureCompaction.LAST)
if compaction == FeatureCompaction.LAST:
continue
Expand Down Expand Up @@ -148,28 +155,37 @@ def get_feature_samples(
_OPEN_TIME = BinanceKlineField.OPEN_TIME

# Binance uses open time for queries
open_start_time_ms = start_time_ms - interval_ms
query_start_time_ms = start_time_ms - interval_ms

# Align on interval so queries for 1 sample include at least 1 sample
open_start_time_ms = current_interval_ms(
open_start_time_ms, self._source_interval_ms
query_start_time_ms = current_interval_ms(
query_start_time_ms, self._source_interval_ms
)

open_end_time_ms = start_time_ms + (interval_ms * (sample_count - 2))
end_time_ms = start_time_ms + (interval_ms * (sample_count - 1))

query_parameters = self._query_parameters.copy()
data_rows = []
retries = self._retries
# Loop for pagination
while True:
url = (
"https://api.binance.com/api/v3/klines"
f"?symbol={self._symbol}&interval={self._query_interval}"
f"&startTime={open_start_time_ms}&endTime={open_end_time_ms}"
f"&limit={self._QUERY_LIMIT}"
while query_start_time_ms < end_time_ms:
page_sample_count = (
end_time_ms - query_start_time_ms
) / self._source_interval_ms
page_sample_count = int(min(page_sample_count, self._QUERY_LIMIT))

if page_sample_count < 1:
break

query_end_time_ms = query_start_time_ms + (
self._source_interval_ms * (page_sample_count - 1)
)

query_parameters["startTime"] = str(query_start_time_ms)
query_parameters["endTime"] = str(query_end_time_ms)
url = self._URL + "?" + urlencode(query_parameters)

success = False
response_row_count = 0
# Loop for retries
while True:
try:
Expand All @@ -189,7 +205,6 @@ def get_feature_samples(
)
else:
response_rows = response.json()
response_row_count = len(response_rows)
data_rows.extend(response_rows)
success = True

Expand All @@ -208,11 +223,9 @@ def get_feature_samples(
retries -= 1
time.sleep(self.RETRY_DELAY)

if response_row_count != self._QUERY_LIMIT:
break

open_start_time_ms = data_rows[-1][_OPEN_TIME] + self._source_interval_ms
query_start_time_ms = query_end_time_ms + self._source_interval_ms

# TODO: Examine moving the rest of this function into FeatureSource base class
row_count = len(data_rows)
if row_count == 0:
raise RuntimeError("No samples received.")
Expand Down
59 changes: 33 additions & 26 deletions feature_sources/bybit_kline_feature_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import statistics
import time
from time_util import current_interval_ms, time_span_ms
from urllib.parse import urlencode


class BybitKlineField(IntEnum):
Expand All @@ -31,7 +32,8 @@ class BybitKlineFeatureSource(FeatureSource):

_QUERY_LIMIT = 1000

# Must be in ascending order
_URL = "https://api.bybit.com/v5/market/kline"

_INTERVALS = {
time_span_ms(minutes=1): "1",
time_span_ms(minutes=3): "3",
Expand Down Expand Up @@ -84,12 +86,17 @@ def __init__(
self.VALID_FEATURE_IDS = feature_ids
super().__init__(feature_ids, feature_dtypes, default_dtype)

self._category = category
self._symbol = symbol
query_parameters = {
"category": category,
"symbol": symbol,
"interval": query_interval,
"limit": self._QUERY_LIMIT,
}

self._source_interval_ms = source_interval_ms
self._query_interval = query_interval
self._feature_mappings = feature_mappings
self._fields = list(feature_mappings.values())
self._query_parameters = query_parameters
self._retries = retries
self._logger = getLogger(self.__class__.__name__)

Expand All @@ -107,7 +114,7 @@ def _convert_samples(self, data_rows: list[list]) -> None:

def _compact_samples(self, samples: list[list]) -> list:
result = samples[-1].copy()
for field in BybitKlineField:
for field in self._fields:
compaction = self._FIELD_COMPACTIONS.get(field, FeatureCompaction.LAST)
if compaction == FeatureCompaction.LAST:
continue
Expand Down Expand Up @@ -140,30 +147,36 @@ def get_feature_samples(
_OPEN_TIME = BybitKlineField.OPEN_TIME

# Bybit uses open time for queries
open_start_time_ms = start_time_ms - interval_ms
query_start_time_ms = start_time_ms - interval_ms

# Align on interval so queries for 1 sample include at least 1 sample
open_start_time_ms = current_interval_ms(
open_start_time_ms, self._source_interval_ms
query_start_time_ms = current_interval_ms(
query_start_time_ms, self._source_interval_ms
)

end_time_ms = start_time_ms + (interval_ms * (sample_count - 1))

query_parameters = self._query_parameters.copy()
data_rows = []
retries = self._retries
samples_left = sample_count
# Loop for pagination
while True:
page_sample_count = min(samples_left, self._QUERY_LIMIT)
open_end_time_ms = open_start_time_ms + (
interval_ms * (page_sample_count - 1)
)
while query_start_time_ms < end_time_ms:
page_sample_count = (
end_time_ms - query_start_time_ms
) / self._source_interval_ms
page_sample_count = int(min(page_sample_count, self._QUERY_LIMIT))

url = (
"https://api.bybit.com/v5/market/kline"
f"?category={self._category}&symbol={self._symbol}"
f"&start={open_start_time_ms}&end={open_end_time_ms}"
f"&interval={self._query_interval}&limit={self._QUERY_LIMIT}"
if page_sample_count < 1:
break

query_end_time_ms = query_start_time_ms + (
self._source_interval_ms * (page_sample_count - 1)
)

query_parameters["start"] = str(query_start_time_ms)
query_parameters["end"] = str(query_end_time_ms)
url = self._URL + "?" + urlencode(query_parameters)

success = False
# Loop for retries
while True:
Expand Down Expand Up @@ -200,13 +213,7 @@ def get_feature_samples(
retries -= 1
time.sleep(self.RETRY_DELAY)

samples_left -= page_sample_count
if samples_left <= 0:
break

open_start_time_ms = (
int(data_rows[-1][_OPEN_TIME]) + self._source_interval_ms
)
query_start_time_ms = query_end_time_ms + self._source_interval_ms

row_count = len(data_rows)
if row_count == 0:
Expand Down
28 changes: 14 additions & 14 deletions feature_sources/coin_metrics_feature_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,14 +153,13 @@ def _convert_metric(self, metric: str, value):
value = float(value)
return value

def _convert_sample(self, sample: dict) -> dict:
results = {}
def _convert_sample(self, sample: dict) -> None:
for metric in self._convert_metrics:
results[metric] = self._convert_metric(metric, sample[metric])
return results
sample[metric.value] = self._convert_metric(metric, sample[metric])

def _convert_samples(self, data_rows: list[dict]) -> list[dict]:
return [self._convert_sample(row) for row in data_rows]
def _convert_samples(self, data_rows: list[dict]) -> None:
for row in data_rows:
self._convert_sample(row)

def _compact_samples(self, samples: list[dict]) -> dict:
result = samples[-1].copy()
Expand Down Expand Up @@ -216,19 +215,22 @@ def get_feature_samples(
interval_ms: int,
sample_count: int,
) -> dict[FeatureID, ndarray]:
query_start_time_ms = start_time_ms
_OPEN_TIME = CoinMetric.TIME

# Coin Metrics uses open time for queries
query_start_time_ms = start_time_ms - interval_ms

# Align on interval so queries for 1 sample include at least 1 sample
query_start_time_ms = current_interval_ms(
query_start_time_ms, self._source_interval_ms
)

end_time_ms = start_time_ms + (interval_ms * (sample_count - 1))

# Times must be preformatted because Coin Metrics rejects times with
# the ISO timezone suffix for UTC ("+00:00") and their Python
# library doesn't format it for their preference
start_time = datetime.fromtimestamp_ms(query_start_time_ms)
# TODO: Subtract 1 from sample_count?
end_time_ms = start_time_ms + (interval_ms * sample_count)
end_time = datetime.fromtimestamp_ms(end_time_ms)
start_time_string = start_time.to_iso8601_string()
end_time_string = end_time.to_iso8601_string()
Expand All @@ -240,9 +242,7 @@ def get_feature_samples(
if row_count == 0:
raise RuntimeError("No samples received.")

# TODO: Change to inplace conversion?
converted_samples = self._convert_samples(data_rows)

self._convert_samples(data_rows)
feature_samples = self._create_feature_samples(sample_count)

sample_time_ms = start_time_ms
Expand All @@ -252,8 +252,8 @@ def get_feature_samples(
compact_samples = self._compact_samples
for sample_index in range(sample_count):
while True:
row = converted_samples[row_index]
row_time_ms = row[CoinMetric.TIME]
row = data_rows[row_index]
row_time_ms = row[_OPEN_TIME] + self._source_interval_ms
if row_time_ms > sample_time_ms:
break
interval_rows.append(row)
Expand Down
Loading