irm-kmi-ha/custom_components/irm_kmi/api.py

125 lines
4.4 KiB
Python

"""API Client for IRM KMI weather"""
from __future__ import annotations
import asyncio
import hashlib
import json
import logging
import socket
import time
from datetime import datetime
import aiohttp
import async_timeout
from .const import USER_AGENT
_LOGGER = logging.getLogger(__name__)
class IrmKmiApiError(Exception):
"""Exception to indicate a general API error."""
class IrmKmiApiCommunicationError(IrmKmiApiError):
"""Exception to indicate a communication error."""
class IrmKmiApiParametersError(IrmKmiApiError):
"""Exception to indicate a parameter error."""
def _api_key(method_name: str) -> str:
"""Get API key."""
return hashlib.md5(f"r9EnW374jkJ9acc;{method_name};{datetime.now().strftime('%d/%m/%Y')}".encode()).hexdigest()
class IrmKmiApiClient:
"""API client for IRM KMI weather data"""
COORD_DECIMALS = 6
cache_max_age = 60 * 60 * 2 # Remove items from the cache if they have not been hit since 2 hours
cache = {}
def __init__(self, session: aiohttp.ClientSession) -> None:
self._session = session
self._base_url = "https://app.meteo.be/services/appv4/"
async def get_forecasts_coord(self, coord: dict) -> dict:
"""Get forecasts for given city."""
assert 'lat' in coord
assert 'long' in coord
coord['lat'] = round(coord['lat'], self.COORD_DECIMALS)
coord['long'] = round(coord['long'], self.COORD_DECIMALS)
response: bytes = await self._api_wrapper(params={"s": "getForecasts", "k": _api_key("getForecasts")} | coord)
return json.loads(response)
async def get_image(self, url, params: dict | None = None) -> bytes:
"""Get the image at the specified url with the parameters"""
r: bytes = await self._api_wrapper(base_url=url, params={} if params is None else params)
return r
async def get_svg(self, url, params: dict | None = None) -> str:
"""Get SVG as str at the specified url with the parameters"""
r: bytes = await self._api_wrapper(base_url=url, params={} if params is None else params)
return r.decode()
async def _api_wrapper(
self,
params: dict,
base_url: str | None = None,
path: str = "",
method: str = "get",
data: dict | None = None,
headers: dict | None = None,
) -> bytes:
"""Get information from the API."""
url = f"{self._base_url if base_url is None else base_url}{path}"
if headers is None:
headers = {'User-Agent': USER_AGENT}
else:
headers['User-Agent'] = USER_AGENT
if url in self.cache:
headers['If-None-Match'] = self.cache[url]['etag']
try:
async with async_timeout.timeout(60):
response = await self._session.request(
method=method,
url=url,
headers=headers,
json=data,
params=params
)
response.raise_for_status()
if response.status == 304:
_LOGGER.debug(f"Cache hit for {url}")
self.cache[url]['timestamp'] = time.time()
return self.cache[url]['response']
if 'ETag' in response.headers:
_LOGGER.debug(f"Saving in cache {url}")
r = await response.read()
self.cache[url] = {'etag': response.headers['ETag'], 'response': r, 'timestamp': time.time()}
return r
return await response.read()
except asyncio.TimeoutError as exception:
raise IrmKmiApiCommunicationError("Timeout error fetching information") from exception
except (aiohttp.ClientError, socket.gaierror) as exception:
raise IrmKmiApiCommunicationError("Error fetching information") from exception
except Exception as exception: # pylint: disable=broad-except
raise IrmKmiApiError(f"Something really wrong happened! {exception}") from exception
def expire_cache(self):
now = time.time()
keys_to_delete = set()
for key, value in self.cache.items():
if now - value['timestamp'] > self.cache_max_age:
keys_to_delete.add(key)
for key in keys_to_delete:
del self.cache[key]
_LOGGER.info(f"Expired {len(keys_to_delete)} elements from API cache")