Add docstring for API

This commit is contained in:
Jules 2025-05-04 11:00:25 +02:00
parent 2f1c64b1c8
commit 760a13b19f
Signed by: jdejaegh
GPG key ID: 99D6D184CA66933A
2 changed files with 131 additions and 53 deletions

View file

@ -8,7 +8,7 @@ import time
import urllib.parse
from datetime import datetime, timedelta
from statistics import mean
from typing import List, Tuple
from typing import List, Dict
from zoneinfo import ZoneInfo
import aiohttp
@ -37,43 +37,61 @@ class IrmKmiApiParametersError(IrmKmiApiError):
"""Exception to indicate a parameter error."""
def _api_key(method_name: str) -> str:
"""Get API key."""
return hashlib.md5(f"r9EnW374jkJ9acc;{method_name};{datetime.now().strftime('%d/%m/%Y')}".encode()).hexdigest()
class IrmKmiApiClient:
"""API client for IRM KMI weather data"""
COORD_DECIMALS = 6
cache_max_age = 60 * 60 * 2 # Remove items from the cache if they have not been hit since 2 hours
cache = {}
_cache_max_age = 60 * 60 * 2 # Remove items from the cache if they have not been hit since 2 hours
_cache = {}
def __init__(self, session: aiohttp.ClientSession, user_agent: str) -> None:
self._session = session
self._base_url = "https://app.meteo.be/services/appv4/"
self._user_agent = user_agent
async def get_forecasts_coord(self, coord: dict) -> dict:
"""Get forecasts for given city."""
async def get_forecasts_coord(self, coord: Dict[str, float | int]) -> dict:
"""
Get forecasts for given location.
:param coord: dict with the following keys: 'lat', 'long' (both float or int)
:return: raw forecast as python dict
:raise: IrmKmiApiError when communication with the API fails
"""
assert 'lat' in coord
assert 'long' in coord
coord['lat'] = round(coord['lat'], self.COORD_DECIMALS)
coord['long'] = round(coord['long'], self.COORD_DECIMALS)
response: bytes = await self._api_wrapper(params={"s": "getForecasts", "k": _api_key("getForecasts")} | coord)
response: bytes = await self._api_wrapper(
params={"s": "getForecasts", "k": self._api_key("getForecasts")} | coord
)
response: dict = json.loads(response)
_LOGGER.debug(f"Observation for {response.get('cityName', '')}: {response.get('obs', '{}')}")
_LOGGER.debug(f"Full data: {response}")
return response
async def get_image(self, url, params: dict | None = None) -> bytes:
"""Get the image at the specified url with the parameters"""
async def get_image(self, url, params: Dict[str, str] | None = None) -> bytes:
"""
Get the image at the specified url with the parameters
:param url: URL to fetch
:param params: query parameters to add to the request
:return: response body as bytes
:raise: IrmKmiApiError when communication with the API fails
"""
r: bytes = await self._api_wrapper(base_url=url, params={} if params is None else params)
return r
async def get_svg(self, url, params: dict | None = None) -> str:
"""Get SVG as str at the specified url with the parameters"""
async def get_svg(self, url, params: Dict[str, str] | None = None) -> str:
"""
Get SVG as str at the specified url with the parameters
:param url: URL to fetch
:param params: query parameters to add to the request
:return: request body decoded as utf-8 str
:raise: IrmKmiApiError when communication with the API fails
"""
r: bytes = await self._api_wrapper(base_url=url, params={} if params is None else params)
return r.decode()
@ -94,8 +112,8 @@ class IrmKmiApiClient:
else:
headers['User-Agent'] = self._user_agent
if url in self.cache:
headers['If-None-Match'] = self.cache[url]['etag']
if url in self._cache:
headers['If-None-Match'] = self._cache[url]['etag']
try:
async with async_timeout.timeout(60):
@ -110,13 +128,13 @@ class IrmKmiApiClient:
if response.status == 304:
_LOGGER.debug(f"Cache hit for {url}")
self.cache[url]['timestamp'] = time.time()
return self.cache[url]['response']
self._cache[url]['timestamp'] = time.time()
return self._cache[url]['response']
if 'ETag' in response.headers:
_LOGGER.debug(f"Saving in cache {url}")
r = await response.read()
self.cache[url] = {'etag': response.headers['ETag'], 'response': r, 'timestamp': time.time()}
self._cache[url] = {'etag': response.headers['ETag'], 'response': r, 'timestamp': time.time()}
return r
return await response.read()
@ -128,34 +146,66 @@ class IrmKmiApiClient:
except Exception as exception: # pylint: disable=broad-except
raise IrmKmiApiError(f"Something really wrong happened! {exception}") from exception
def expire_cache(self):
@staticmethod
def _api_key(method_name: str) -> str:
"""Get API key."""
return hashlib.md5(f"r9EnW374jkJ9acc;{method_name};{datetime.now().strftime('%d/%m/%Y')}".encode()).hexdigest()
def expire_cache(self) -> None:
"""
Expire items from the cache which have not been accessed since self._cache_max_age (default 2h).
Must be called regularly to clear the cache.
"""
now = time.time()
keys_to_delete = set()
for key, value in self.cache.items():
if now - value['timestamp'] > self.cache_max_age:
for key, value in self._cache.items():
if now - value['timestamp'] > self._cache_max_age:
keys_to_delete.add(key)
for key in keys_to_delete:
del self.cache[key]
del self._cache[key]
_LOGGER.info(f"Expired {len(keys_to_delete)} elements from API cache")
class IrmKmiApiClientHa(IrmKmiApiClient):
"""API client for IRM KMI weather data with additional methods to integrate easily with Home Assistant"""
def __init__(self, session: aiohttp.ClientSession, user_agent: str, cdt_map: dict) -> None:
super().__init__(session, user_agent)
self._api_data = dict()
self._cdt_map = cdt_map
async def refresh_forecasts_coord(self, coord: dict) -> None:
async def refresh_forecasts_coord(self, coord: Dict[str, float | int]) -> None:
"""
Update the weather data by contacting the remote API. Keep the data in memory for future methods calls.
:param coord: dict with the following keys: 'lat', 'long' (both float or int)
:raise: IrmKmiApiError when communication with the API fails
"""
self._api_data = await self.get_forecasts_coord(coord)
def get_city(self) -> str | None:
"""
Get the city for which we currently have the forecast
:return: city name as str or None if unavailable
"""
return self._api_data.get('cityName', None)
def get_country(self) -> str | None:
"""
Get the two-letters country code for which we currently have the forecast
:return: country code as str or None if unavailable
"""
return self._api_data.get('country', None)
def get_current_weather(self, tz: ZoneInfo) -> CurrentWeatherData:
"""Parse the API data to build a CurrentWeatherData."""
"""
Parse the API data we currently have to build a CurrentWeatherData.
:param tz: time zone to use to interpret the timestamps in the forecast (generally is Europe/Brussels)
:return: current weather
"""
now_hourly = self._get_now_hourly(tz)
uv_index = self._get_uv_index()
@ -246,7 +296,13 @@ class IrmKmiApiClientHa(IrmKmiApiClient):
return now_hourly
def get_daily_forecast(self, tz: ZoneInfo, lang: str) -> List[IrmKmiForecast]:
"""Parse data from the API to create a list of daily forecasts"""
"""
Parse the API data we currently have to build the daily forecast list.
:param tz: time zone to use to interpret the timestamps in the forecast (generally is Europe/Brussels)
:param lang: langage to get data for (must be 'fr', 'nl', 'de' or 'en')
:return: chronologically ordered list of daily forecasts
"""
data = self._api_data.get('for', {}).get('daily')
if data is None or not isinstance(data, list) or len(data) == 0:
return []
@ -340,7 +396,12 @@ class IrmKmiApiClientHa(IrmKmiApiClient):
return forecasts
def get_hourly_forecast(self, tz: ZoneInfo) -> List[Forecast]:
"""Parse data from the API to create a list of hourly forecasts"""
"""
Parse data from the API to create a list of hourly forecasts
:param tz: time zone to use to interpret the timestamps in the forecast (generally is Europe/Brussels)
:return: chronologically ordered list of hourly forecasts
"""
data = self._api_data.get('for', {}).get('hourly')
if data is None or not isinstance(data, list) or len(data) == 0:
@ -392,7 +453,11 @@ class IrmKmiApiClientHa(IrmKmiApiClient):
return forecasts
def get_radar_forecast(self) -> List[IrmKmiRadarForecast]:
"""Create a list of short term forecasts for rain based on the data provided by the rain radar"""
"""
Create a list of short term forecasts for rain based on the data provided by the rain radar
:return: chronologically ordered list of 'few'-minutes radar forecasts
"""
data = self._api_data.get('animation', {})
if not isinstance(data, dict):
@ -420,14 +485,16 @@ class IrmKmiApiClientHa(IrmKmiApiClient):
)
return forecast
def get_animation_data(self,
tz: ZoneInfo,
lang: str,
style: str,
dark_mode: bool
) -> RadarAnimationData:
"""From the API data passed in, call the API to get all the images and create the radar animation data object.
Frames from the API are merged with the background map and the location marker to create each frame."""
def get_animation_data(self, tz: ZoneInfo, lang: str, style: str, dark_mode: bool) -> RadarAnimationData:
"""
Get all the image URLs and create the radar animation data object.
:param tz: time zone to use to interpret the timestamps in the forecast (generally is Europe/Brussels)
:param lang: langage to get data for (must be 'fr', 'nl', 'de' or 'en')
:param style: style of the radar (key of STYLE_TO_PARAM_MAP)
:param dark_mode: true if dark mode
:return: animation data that can be used to download the images and build the rain graph animation
"""
animation_data = self._api_data.get('animation', {}).get('sequence')
localisation_layer_url = self._api_data.get('animation', {}).get('localisationLayer')
country = self.get_country()
@ -435,9 +502,9 @@ class IrmKmiApiClientHa(IrmKmiApiClient):
if animation_data is None or localisation_layer_url is None or not isinstance(animation_data, list):
raise ValueError("Cannot create animation data")
localisation = self.merge_url_and_params(localisation_layer_url,
{'th': 'd' if country == 'NL' or not dark_mode else 'n'})
images_from_api = [self.merge_url_and_params(frame.get('uri'), {'rs': STYLE_TO_PARAM_MAP[style]})
localisation = self._merge_url_and_params(localisation_layer_url,
{'th': 'd' if country == 'NL' or not dark_mode else 'n'})
images_from_api = [self._merge_url_and_params(frame.get('uri'), {'rs': STYLE_TO_PARAM_MAP[style]})
for frame in animation_data if frame is not None and frame.get('uri') is not None
]
@ -472,7 +539,12 @@ class IrmKmiApiClientHa(IrmKmiApiClient):
return radar_animation
def get_warnings(self, lang: str) -> List[WarningData]:
"""Create a list of warning data instances based on the api data"""
"""
Parse the API data we currently have to build the list of warnings.
:param lang: langage to get data for (must be 'fr', 'nl', 'de' or 'en')
:return: unordered list of warnings
"""
warning_data = self._api_data.get('for', {}).get('warning')
if warning_data is None or not isinstance(warning_data, list) or len(warning_data) == 0:
return []
@ -506,8 +578,13 @@ class IrmKmiApiClientHa(IrmKmiApiClient):
return result if len(result) > 0 else []
async def get_pollen(self) -> dict:
"""Get SVG pollen info from the API, return the pollen data dict"""
async def get_pollen(self) -> Dict[str, str | None]:
"""
Get SVG pollen info from the API, return the pollen data dict
:return: pollen data as dict mapping from pollen name to pollen level as a color
:raise: IrmKmiApiError when communication with the API fails
"""
_LOGGER.debug("Getting pollen data from API")
svg_url = None
for module in self._api_data.get('module', []):
@ -529,7 +606,8 @@ class IrmKmiApiClientHa(IrmKmiApiClient):
return PollenParser(pollen_svg).get_pollen_data()
@staticmethod
def merge_url_and_params(url, params):
def _merge_url_and_params(url: str, params: dict) -> str:
"""Merge query string params in the URL"""
parsed_url = urllib.parse.urlparse(url)
query_params = urllib.parse.parse_qs(parsed_url.query)
query_params.update(params)

View file

@ -69,9 +69,9 @@ async def test_get_image_api_called() -> None:
def test_expire_cache_clears_items() -> None:
api = IrmKmiApiClient(session=MagicMock(), user_agent="test-user-agent")
assert api.cache_max_age == 60 * 60 * 2
assert api._cache_max_age == 60 * 60 * 2
api.cache = {
api._cache = {
'first-url': {
'timestamp': time.time() - timedelta(hours=3).seconds,
'response': 'wowo',
@ -84,12 +84,12 @@ def test_expire_cache_clears_items() -> None:
}
}
assert len(api.cache) == 2
assert len(api._cache) == 2
api.expire_cache()
assert len(api.cache) == 1
assert 'second-url' in api.cache
assert len(api._cache) == 1
assert 'second-url' in api._cache
async def test_api_wrapper_puts_response_in_cache() -> None:
@ -107,8 +107,8 @@ async def test_api_wrapper_puts_response_in_cache() -> None:
r = await api._api_wrapper(params={}, base_url='test-url')
assert r == b"response value"
assert len(api.cache) == 1
assert 'test-url' in api.cache
assert len(api._cache) == 1
assert 'test-url' in api._cache
session.request.assert_awaited_once_with(
method='get', url='test-url', headers={'User-Agent': 'test-user-agent'}, json=None, params={}
@ -126,7 +126,7 @@ async def test_api_wrapper_gets_response_from_cache() -> None:
session.request = AsyncMock(return_value=response)
api = IrmKmiApiClient(session=session, user_agent="test-user-agent")
api.cache = {
api._cache = {
'test-url': {
'timestamp': time.time(),
'response': b"response value",
@ -137,8 +137,8 @@ async def test_api_wrapper_gets_response_from_cache() -> None:
r = await api._api_wrapper(params={}, base_url='test-url')
assert r == b"response value"
assert len(api.cache) == 1
assert 'test-url' in api.cache
assert len(api._cache) == 1
assert 'test-url' in api._cache
session.request.assert_awaited_once_with(
method='get',