exchange_rate: historical rates: merge old+new data, don't overwrite
- CoinGecko restricted its historical API to last 365 days - we used to ask for, and get, the whole history, but now we can only ask for the last year - so change HTTP request to only ask for 365 days - we cache historical rates to disk - previously we used to overwrite what is already stored, with the newly obtained data - now this is changed so that we merge the new data into the already stored data
This commit is contained in:
@@ -8,7 +8,7 @@ import time
|
|||||||
import csv
|
import csv
|
||||||
import decimal
|
import decimal
|
||||||
from decimal import Decimal
|
from decimal import Decimal
|
||||||
from typing import Sequence, Optional, Mapping, Dict, Union, Any
|
from typing import Sequence, Optional, Mapping, Dict, Union, Any, Tuple
|
||||||
|
|
||||||
from aiorpcx.curio import timeout_after, TaskTimeout, ignore_after
|
from aiorpcx.curio import timeout_after, TaskTimeout, ignore_after
|
||||||
import aiohttp
|
import aiohttp
|
||||||
@@ -97,30 +97,49 @@ class ExchangeBase(Logger):
|
|||||||
self._quotes_timestamp = time.time()
|
self._quotes_timestamp = time.time()
|
||||||
self.on_quotes(received_new_data=True)
|
self.on_quotes(received_new_data=True)
|
||||||
|
|
||||||
def read_historical_rates(self, ccy: str, cache_dir: str) -> Optional[dict]:
|
@staticmethod
|
||||||
filename = os.path.join(cache_dir, self.name() + '_'+ ccy)
|
def _read_historical_rates_from_file(
|
||||||
|
*, exchange_name: str, ccy: str, cache_dir: str,
|
||||||
|
) -> Tuple[Optional[dict], Optional[float]]:
|
||||||
|
filename = os.path.join(cache_dir, f"{exchange_name}_{ccy}")
|
||||||
if not os.path.exists(filename):
|
if not os.path.exists(filename):
|
||||||
return None
|
return None, None
|
||||||
timestamp = os.stat(filename).st_mtime
|
timestamp = os.stat(filename).st_mtime
|
||||||
try:
|
try:
|
||||||
with open(filename, 'r', encoding='utf-8') as f:
|
with open(filename, 'r', encoding='utf-8') as f:
|
||||||
h = json.loads(f.read())
|
h = json.loads(f.read())
|
||||||
except Exception:
|
except Exception:
|
||||||
return None
|
return None, None
|
||||||
if not h: # e.g. empty dict
|
if not h: # e.g. empty dict
|
||||||
return None
|
return None, None
|
||||||
# cast rates to str
|
# cast rates to str
|
||||||
h = {date_str: str(rate) for (date_str, rate) in h.items()}
|
h = {date_str: str(rate) for (date_str, rate) in h.items()}
|
||||||
|
return h, timestamp
|
||||||
|
|
||||||
|
def read_historical_rates(self, ccy: str, cache_dir: str) -> Optional[dict]:
|
||||||
|
h, timestamp = self._read_historical_rates_from_file(
|
||||||
|
exchange_name=self.name(),
|
||||||
|
ccy=ccy,
|
||||||
|
cache_dir=cache_dir,
|
||||||
|
)
|
||||||
h['timestamp'] = timestamp
|
h['timestamp'] = timestamp
|
||||||
self._history[ccy] = h
|
self._history[ccy] = h
|
||||||
self.on_history()
|
self.on_history()
|
||||||
return h
|
return h
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _write_historical_rates_to_file(
|
||||||
|
*, exchange_name: str, ccy: str, cache_dir: str, history: Dict[str, str],
|
||||||
|
) -> None:
|
||||||
|
filename = os.path.join(cache_dir, f"{exchange_name}_{ccy}")
|
||||||
|
with open(filename, 'w', encoding='utf-8') as f:
|
||||||
|
f.write(json.dumps(history))
|
||||||
|
|
||||||
@log_exceptions
|
@log_exceptions
|
||||||
async def get_historical_rates_safe(self, ccy: str, cache_dir: str) -> None:
|
async def get_historical_rates_safe(self, ccy: str, cache_dir: str) -> None:
|
||||||
try:
|
try:
|
||||||
self.logger.info(f"requesting fx history for {ccy}")
|
self.logger.info(f"requesting fx history for {ccy}")
|
||||||
h = await self.request_history(ccy)
|
h_new = await self.request_history(ccy)
|
||||||
self.logger.info(f"received fx history for {ccy}")
|
self.logger.info(f"received fx history for {ccy}")
|
||||||
except (aiohttp.ClientError, asyncio.TimeoutError, OSError) as e:
|
except (aiohttp.ClientError, asyncio.TimeoutError, OSError) as e:
|
||||||
self.logger.info(f"failed fx history: {repr(e)}")
|
self.logger.info(f"failed fx history: {repr(e)}")
|
||||||
@@ -129,10 +148,16 @@ class ExchangeBase(Logger):
|
|||||||
self.logger.exception(f"failed fx history: {repr(e)}")
|
self.logger.exception(f"failed fx history: {repr(e)}")
|
||||||
return
|
return
|
||||||
# cast rates to str
|
# cast rates to str
|
||||||
h = {date_str: str(rate) for (date_str, rate) in h.items()}
|
h_new = {date_str: str(rate) for (date_str, rate) in h_new.items()}
|
||||||
filename = os.path.join(cache_dir, self.name() + '_' + ccy)
|
# merge old history and new history. resolve duplicate dates using new data.
|
||||||
with open(filename, 'w', encoding='utf-8') as f:
|
h_old, _timestamp = self._read_historical_rates_from_file(
|
||||||
f.write(json.dumps(h))
|
exchange_name=self.name(), ccy=ccy, cache_dir=cache_dir,
|
||||||
|
)
|
||||||
|
h = {**h_old, **h_new}
|
||||||
|
# write merged data to disk cache
|
||||||
|
self._write_historical_rates_to_file(
|
||||||
|
exchange_name=self.name(), ccy=ccy, cache_dir=cache_dir, history=h,
|
||||||
|
)
|
||||||
h['timestamp'] = time.time()
|
h['timestamp'] = time.time()
|
||||||
self._history[ccy] = h
|
self._history[ccy] = h
|
||||||
self.on_history()
|
self.on_history()
|
||||||
@@ -352,8 +377,12 @@ class CoinGecko(ExchangeBase):
|
|||||||
return CURRENCIES[self.name()]
|
return CURRENCIES[self.name()]
|
||||||
|
|
||||||
async def request_history(self, ccy):
|
async def request_history(self, ccy):
|
||||||
|
num_days = 365
|
||||||
|
# Setting `num_days = "max"` started erroring (around 2024-04) with:
|
||||||
|
# > Your request exceeds the allowed time range. Public API users are limited to querying
|
||||||
|
# > historical data within the past 365 days. Upgrade to a paid plan to enjoy full historical data access
|
||||||
history = await self.get_json('api.coingecko.com',
|
history = await self.get_json('api.coingecko.com',
|
||||||
'/api/v3/coins/bitcoin/market_chart?vs_currency=%s&days=max' % ccy)
|
f"/api/v3/coins/bitcoin/market_chart?vs_currency={ccy}&days={num_days}")
|
||||||
|
|
||||||
return dict([(timestamp_to_datetime(h[0]/1000, utc=True).strftime('%Y-%m-%d'), str(h[1]))
|
return dict([(timestamp_to_datetime(h[0]/1000, utc=True).strftime('%Y-%m-%d'), str(h[1]))
|
||||||
for h in history['prices']])
|
for h in history['prices']])
|
||||||
|
|||||||
Reference in New Issue
Block a user