From 717f987083918d63b1e912dca82c1e97281a8617 Mon Sep 17 00:00:00 2001 From: Jules Dejaeghere Date: Sun, 4 May 2025 11:38:10 +0200 Subject: [PATCH] Add docstring and reorder methods --- irm_kmi_api/api.py | 348 +++++++++++++++++++------------------- irm_kmi_api/const.py | 3 + irm_kmi_api/pollen.py | 69 ++++---- irm_kmi_api/rain_graph.py | 200 +++++++++++++--------- irm_kmi_api/utils.py | 8 - tests/test_rain_graph.py | 48 +++--- 6 files changed, 359 insertions(+), 317 deletions(-) delete mode 100644 irm_kmi_api/utils.py diff --git a/irm_kmi_api/api.py b/irm_kmi_api/api.py index 56ae7f7..f8b7072 100644 --- a/irm_kmi_api/api.py +++ b/irm_kmi_api/api.py @@ -20,7 +20,6 @@ from .data import (AnimationFrameData, CurrentWeatherData, Forecast, IrmKmiForecast, IrmKmiRadarForecast, RadarAnimationData, WarningData) from .pollen import PollenParser -from .utils import next_weekday _LOGGER = logging.getLogger(__name__) @@ -37,7 +36,6 @@ class IrmKmiApiParametersError(IrmKmiApiError): """Exception to indicate a parameter error.""" - class IrmKmiApiClient: """API client for IRM KMI weather data""" COORD_DECIMALS = 6 @@ -95,6 +93,20 @@ class IrmKmiApiClient: r: bytes = await self._api_wrapper(base_url=url, params={} if params is None else params) return r.decode() + def expire_cache(self) -> None: + """ + Expire items from the cache which have not been accessed since self._cache_max_age (default 2h). + Must be called regularly to clear the cache. + """ + now = time.time() + keys_to_delete = set() + for key, value in self._cache.items(): + if now - value['timestamp'] > self._cache_max_age: + keys_to_delete.add(key) + for key in keys_to_delete: + del self._cache[key] + _LOGGER.info(f"Expired {len(keys_to_delete)} elements from API cache") + async def _api_wrapper( self, params: dict, @@ -151,20 +163,6 @@ class IrmKmiApiClient: """Get API key.""" return hashlib.md5(f"r9EnW374jkJ9acc;{method_name};{datetime.now().strftime('%d/%m/%Y')}".encode()).hexdigest() - def expire_cache(self) -> None: - """ - Expire items from the cache which have not been accessed since self._cache_max_age (default 2h). - Must be called regularly to clear the cache. - """ - now = time.time() - keys_to_delete = set() - for key, value in self._cache.items(): - if now - value['timestamp'] > self._cache_max_age: - keys_to_delete.add(key) - for key in keys_to_delete: - del self._cache[key] - _LOGGER.info(f"Expired {len(keys_to_delete)} elements from API cache") - class IrmKmiApiClientHa(IrmKmiApiClient): """API client for IRM KMI weather data with additional methods to integrate easily with Home Assistant""" @@ -183,22 +181,6 @@ class IrmKmiApiClientHa(IrmKmiApiClient): """ self._api_data = await self.get_forecasts_coord(coord) - def get_city(self) -> str | None: - """ - Get the city for which we currently have the forecast - - :return: city name as str or None if unavailable - """ - return self._api_data.get('cityName', None) - - def get_country(self) -> str | None: - """ - Get the two-letters country code for which we currently have the forecast - - :return: country code as str or None if unavailable - """ - return self._api_data.get('country', None) - def get_current_weather(self, tz: ZoneInfo) -> CurrentWeatherData: """ Parse the API data we currently have to build a CurrentWeatherData. @@ -272,128 +254,38 @@ class IrmKmiApiClientHa(IrmKmiApiClient): return current_weather - def _get_uv_index(self) -> float | None: - uv_index = None - module_data = self._api_data.get('module', None) - if not (module_data is None or not isinstance(module_data, list)): - for module in module_data: - if module.get('type', None) == 'uv': - uv_index = module.get('data', {}).get('levelValue') - return uv_index - - def _get_now_hourly(self, tz: ZoneInfo) -> dict | None: - now_hourly = None - hourly_forecast_data = self._api_data.get('for', {}).get('hourly') - now = datetime.now(tz) - if not (hourly_forecast_data is None - or not isinstance(hourly_forecast_data, list) - or len(hourly_forecast_data) == 0): - - for current in hourly_forecast_data[:4]: - if now.strftime('%H') == current['hour']: - now_hourly = current - break - return now_hourly - - def get_daily_forecast(self, tz: ZoneInfo, lang: str) -> List[IrmKmiForecast]: + def get_radar_forecast(self) -> List[IrmKmiRadarForecast]: """ - Parse the API data we currently have to build the daily forecast list. + Create a list of short term forecasts for rain based on the data provided by the rain radar - :param tz: time zone to use to interpret the timestamps in the forecast (generally is Europe/Brussels) - :param lang: langage to get data for (must be 'fr', 'nl', 'de' or 'en') - :return: chronologically ordered list of daily forecasts + :return: chronologically ordered list of 'few'-minutes radar forecasts """ - data = self._api_data.get('for', {}).get('daily') - if data is None or not isinstance(data, list) or len(data) == 0: + data = self._api_data.get('animation', {}) + + if not isinstance(data, dict): return [] + sequence = data.get("sequence", []) + unit = data.get("unit", {}).get("en", None) + ratios = [f['value'] / f['position'] for f in sequence if f['position'] > 0] - forecasts = list() - forecast_day = datetime.now(tz) + if len(ratios) > 0: + ratio = mean(ratios) + else: + ratio = 0 - for (idx, f) in enumerate(data): - precipitation = None - if f.get('precipQuantity', None) is not None: - try: - precipitation = float(f.get('precipQuantity')) - except (TypeError, ValueError): - pass - - native_wind_gust_speed = None - if f.get('wind', {}).get('peakSpeed') is not None: - try: - native_wind_gust_speed = int(f.get('wind', {}).get('peakSpeed')) - except (TypeError, ValueError): - pass - - wind_bearing = None - if f.get('wind', {}).get('dirText', {}).get('en') != 'VAR': - try: - wind_bearing = (float(f.get('wind', {}).get('dir')) + 180) % 360 - except (TypeError, ValueError): - pass - - is_daytime = f.get('dayNight', None) == 'd' - - day_name = f.get('dayName', {}).get('en', None) - timestamp = f.get('timestamp', None) - if timestamp is not None: - forecast_day = datetime.fromisoformat(timestamp) - elif day_name in WEEKDAYS: - forecast_day = next_weekday(forecast_day, WEEKDAYS.index(day_name)) - elif day_name in ['Today', 'Tonight']: - forecast_day = datetime.now(tz) - elif day_name == 'Tomorrow': - forecast_day = datetime.now(tz) + timedelta(days=1) - - sunrise_sec = f.get('dawnRiseSeconds', None) - if sunrise_sec is None: - sunrise_sec = f.get('sunRise', None) - sunrise = None - if sunrise_sec is not None: - try: - sunrise = (forecast_day.replace(hour=0, minute=0, second=0, microsecond=0, tzinfo=tz) - + timedelta(seconds=float(sunrise_sec))) - except (TypeError, ValueError): - pass - - sunset_sec = f.get('dawnSetSeconds', None) - if sunset_sec is None: - sunset_sec = f.get('sunSet', None) - sunset = None - if sunset_sec is not None: - try: - sunset = (forecast_day.replace(hour=0, minute=0, second=0, microsecond=0, tzinfo=tz) - + timedelta(seconds=float(sunset_sec))) - except (TypeError, ValueError): - pass - - forecast = IrmKmiForecast( - datetime=(forecast_day.strftime('%Y-%m-%d')), - condition=self._cdt_map.get((f.get('ww1', None), f.get('dayNight', None)), None), - condition_2=self._cdt_map.get((f.get('ww2', None), f.get('dayNight', None)), None), - condition_evol=WWEVOL_TO_ENUM_MAP.get(f.get('wwevol'), None), - native_precipitation=precipitation, - native_temperature=f.get('tempMax', None), - native_templow=f.get('tempMin', None), - native_wind_gust_speed=native_wind_gust_speed, - native_wind_speed=f.get('wind', {}).get('speed'), - precipitation_probability=f.get('precipChance', None), - wind_bearing=wind_bearing, - is_daytime=is_daytime, - text=f.get('text', {}).get(lang, ""), - sunrise=sunrise.isoformat() if sunrise is not None else None, - sunset=sunset.isoformat() if sunset is not None else None + forecast = list() + for f in sequence: + forecast.append( + IrmKmiRadarForecast( + datetime=f.get("time"), + native_precipitation=f.get('value'), + rain_forecast_max=round(f.get('positionHigher') * ratio, 2), + rain_forecast_min=round(f.get('positionLower') * ratio, 2), + might_rain=f.get('positionHigher') > 0, + unit=unit + ) ) - # Swap temperature and templow if needed - if (forecast['native_templow'] is not None - and forecast['native_temperature'] is not None - and forecast['native_templow'] > forecast['native_temperature']): - (forecast['native_templow'], forecast['native_temperature']) = \ - (forecast['native_temperature'], forecast['native_templow']) - - forecasts.append(forecast) - - return forecasts + return forecast def get_hourly_forecast(self, tz: ZoneInfo) -> List[Forecast]: """ @@ -452,38 +344,105 @@ class IrmKmiApiClientHa(IrmKmiApiClient): return forecasts - def get_radar_forecast(self) -> List[IrmKmiRadarForecast]: + def get_daily_forecast(self, tz: ZoneInfo, lang: str) -> List[IrmKmiForecast]: """ - Create a list of short term forecasts for rain based on the data provided by the rain radar + Parse the API data we currently have to build the daily forecast list. - :return: chronologically ordered list of 'few'-minutes radar forecasts + :param tz: time zone to use to interpret the timestamps in the forecast (generally is Europe/Brussels) + :param lang: langage to get data for (must be 'fr', 'nl', 'de' or 'en') + :return: chronologically ordered list of daily forecasts """ - data = self._api_data.get('animation', {}) - - if not isinstance(data, dict): + data = self._api_data.get('for', {}).get('daily') + if data is None or not isinstance(data, list) or len(data) == 0: return [] - sequence = data.get("sequence", []) - unit = data.get("unit", {}).get("en", None) - ratios = [f['value'] / f['position'] for f in sequence if f['position'] > 0] - if len(ratios) > 0: - ratio = mean(ratios) - else: - ratio = 0 + forecasts = list() + forecast_day = datetime.now(tz) - forecast = list() - for f in sequence: - forecast.append( - IrmKmiRadarForecast( - datetime=f.get("time"), - native_precipitation=f.get('value'), - rain_forecast_max=round(f.get('positionHigher') * ratio, 2), - rain_forecast_min=round(f.get('positionLower') * ratio, 2), - might_rain=f.get('positionHigher') > 0, - unit=unit - ) + for (idx, f) in enumerate(data): + precipitation = None + if f.get('precipQuantity', None) is not None: + try: + precipitation = float(f.get('precipQuantity')) + except (TypeError, ValueError): + pass + + native_wind_gust_speed = None + if f.get('wind', {}).get('peakSpeed') is not None: + try: + native_wind_gust_speed = int(f.get('wind', {}).get('peakSpeed')) + except (TypeError, ValueError): + pass + + wind_bearing = None + if f.get('wind', {}).get('dirText', {}).get('en') != 'VAR': + try: + wind_bearing = (float(f.get('wind', {}).get('dir')) + 180) % 360 + except (TypeError, ValueError): + pass + + is_daytime = f.get('dayNight', None) == 'd' + + day_name = f.get('dayName', {}).get('en', None) + timestamp = f.get('timestamp', None) + if timestamp is not None: + forecast_day = datetime.fromisoformat(timestamp) + elif day_name in WEEKDAYS: + forecast_day = self._next_weekday(forecast_day, WEEKDAYS.index(day_name)) + elif day_name in ['Today', 'Tonight']: + forecast_day = datetime.now(tz) + elif day_name == 'Tomorrow': + forecast_day = datetime.now(tz) + timedelta(days=1) + + sunrise_sec = f.get('dawnRiseSeconds', None) + if sunrise_sec is None: + sunrise_sec = f.get('sunRise', None) + sunrise = None + if sunrise_sec is not None: + try: + sunrise = (forecast_day.replace(hour=0, minute=0, second=0, microsecond=0, tzinfo=tz) + + timedelta(seconds=float(sunrise_sec))) + except (TypeError, ValueError): + pass + + sunset_sec = f.get('dawnSetSeconds', None) + if sunset_sec is None: + sunset_sec = f.get('sunSet', None) + sunset = None + if sunset_sec is not None: + try: + sunset = (forecast_day.replace(hour=0, minute=0, second=0, microsecond=0, tzinfo=tz) + + timedelta(seconds=float(sunset_sec))) + except (TypeError, ValueError): + pass + + forecast = IrmKmiForecast( + datetime=(forecast_day.strftime('%Y-%m-%d')), + condition=self._cdt_map.get((f.get('ww1', None), f.get('dayNight', None)), None), + condition_2=self._cdt_map.get((f.get('ww2', None), f.get('dayNight', None)), None), + condition_evol=WWEVOL_TO_ENUM_MAP.get(f.get('wwevol'), None), + native_precipitation=precipitation, + native_temperature=f.get('tempMax', None), + native_templow=f.get('tempMin', None), + native_wind_gust_speed=native_wind_gust_speed, + native_wind_speed=f.get('wind', {}).get('speed'), + precipitation_probability=f.get('precipChance', None), + wind_bearing=wind_bearing, + is_daytime=is_daytime, + text=f.get('text', {}).get(lang, ""), + sunrise=sunrise.isoformat() if sunrise is not None else None, + sunset=sunset.isoformat() if sunset is not None else None ) - return forecast + # Swap temperature and templow if needed + if (forecast['native_templow'] is not None + and forecast['native_temperature'] is not None + and forecast['native_templow'] > forecast['native_temperature']): + (forecast['native_templow'], forecast['native_temperature']) = \ + (forecast['native_temperature'], forecast['native_templow']) + + forecasts.append(forecast) + + return forecasts def get_animation_data(self, tz: ZoneInfo, lang: str, style: str, dark_mode: bool) -> RadarAnimationData: """ @@ -605,6 +564,45 @@ class IrmKmiApiClientHa(IrmKmiApiClient): return PollenParser(pollen_svg).get_pollen_data() + def get_city(self) -> str | None: + """ + Get the city for which we currently have the forecast + + :return: city name as str or None if unavailable + """ + return self._api_data.get('cityName', None) + + def get_country(self) -> str | None: + """ + Get the two-letters country code for which we currently have the forecast + + :return: country code as str or None if unavailable + """ + return self._api_data.get('country', None) + + def _get_uv_index(self) -> float | None: + uv_index = None + module_data = self._api_data.get('module', None) + if not (module_data is None or not isinstance(module_data, list)): + for module in module_data: + if module.get('type', None) == 'uv': + uv_index = module.get('data', {}).get('levelValue') + return uv_index + + def _get_now_hourly(self, tz: ZoneInfo) -> dict | None: + now_hourly = None + hourly_forecast_data = self._api_data.get('for', {}).get('hourly') + now = datetime.now(tz) + if not (hourly_forecast_data is None + or not isinstance(hourly_forecast_data, list) + or len(hourly_forecast_data) == 0): + + for current in hourly_forecast_data[:4]: + if now.strftime('%H') == current['hour']: + now_hourly = current + break + return now_hourly + @staticmethod def _merge_url_and_params(url: str, params: dict) -> str: """Merge query string params in the URL""" @@ -615,3 +613,9 @@ class IrmKmiApiClientHa(IrmKmiApiClient): new_url = parsed_url._replace(query=new_query) return str(urllib.parse.urlunparse(new_url)) + @staticmethod + def _next_weekday(current, weekday): + days_ahead = weekday - current.weekday() + if days_ahead < 0: + days_ahead += 7 + return current + timedelta(days_ahead) diff --git a/irm_kmi_api/const.py b/irm_kmi_api/const.py index a556861..bbb8934 100644 --- a/irm_kmi_api/const.py +++ b/irm_kmi_api/const.py @@ -6,6 +6,8 @@ POLLEN_NAMES: Final = {'Alder', 'Ash', 'Birch', 'Grasses', 'Hazel', 'Mugwort', ' POLLEN_LEVEL_TO_COLOR = {'null': 'green', 'low': 'yellow', 'moderate': 'orange', 'high': 'red', 'very high': 'purple', 'active': 'active'} WEEKDAYS = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday'] + +# TODO move those to an Enum OPTION_STYLE_STD: Final = 'standard_style' OPTION_STYLE_CONTRAST: Final = 'contrast_style' OPTION_STYLE_YELLOW_RED: Final = 'yellow_red_style' @@ -16,6 +18,7 @@ STYLE_TO_PARAM_MAP: Final = { OPTION_STYLE_YELLOW_RED: 3, OPTION_STYLE_SATELLITE: 4 } + MAP_WARNING_ID_TO_SLUG: Final = { 0: 'wind', 1: 'rain', diff --git a/irm_kmi_api/pollen.py b/irm_kmi_api/pollen.py index 601bf35..ad43c87 100644 --- a/irm_kmi_api/pollen.py +++ b/irm_kmi_api/pollen.py @@ -21,39 +21,13 @@ class PollenParser: ): self._xml = xml_string - @staticmethod - def get_default_data() -> dict: - """Return all the known pollen with 'none' value""" - return {k.lower(): 'none' for k in POLLEN_NAMES} - - @staticmethod - def get_unavailable_data() -> dict: - """Return all the known pollen with 'none' value""" - return {k.lower(): None for k in POLLEN_NAMES} - - @staticmethod - def get_option_values() -> List[str]: - """List all the values that the pollen can have""" - return list(POLLEN_LEVEL_TO_COLOR.values()) + ['none'] - - @staticmethod - def _extract_elements(root) -> List[ET.Element]: - """Recursively collect all elements of the SVG in a list""" - elements = [] - for child in root: - elements.append(child) - elements.extend(PollenParser._extract_elements(child)) - return elements - - @staticmethod - def _get_elem_text(e) -> str | None: - if e.text is not None: - return e.text.strip() - return None - def get_pollen_data(self) -> dict: - """From the XML string, parse the SVG and extract the pollen data from the image. - If an error occurs, return the default value""" + """ + Parse the SVG and extract the pollen data from the image. + If an error occurs, return the default value. + + :return: pollen dict + """ pollen_data = self.get_default_data() try: _LOGGER.debug(f"Full SVG: {self._xml}") @@ -106,3 +80,34 @@ class PollenParser: _LOGGER.debug(f"Pollen data: {pollen_data}") return pollen_data + + @staticmethod + def get_default_data() -> dict: + """Return all the known pollen with 'none' value""" + return {k.lower(): 'none' for k in POLLEN_NAMES} + + @staticmethod + def get_unavailable_data() -> dict: + """Return all the known pollen with None value""" + return {k.lower(): None for k in POLLEN_NAMES} + + @staticmethod + def get_option_values() -> List[str]: + """List all the values that the pollen can have""" + return list(POLLEN_LEVEL_TO_COLOR.values()) + ['none'] + + @staticmethod + def _extract_elements(root) -> List[ET.Element]: + """Recursively collect all elements of the SVG in a list""" + elements = [] + for child in root: + elements.append(child) + elements.extend(PollenParser._extract_elements(child)) + return elements + + @staticmethod + def _get_elem_text(e) -> str | None: + if e.text is not None: + return e.text.strip() + return None + diff --git a/irm_kmi_api/rain_graph.py b/irm_kmi_api/rain_graph.py index 4f76727..4182d9b 100644 --- a/irm_kmi_api/rain_graph.py +++ b/irm_kmi_api/rain_graph.py @@ -4,7 +4,7 @@ import base64 import copy import datetime import logging -from typing import Any, List, Self +from typing import List, Self import async_timeout from svgwrite import Drawing @@ -20,6 +20,8 @@ _LOGGER = logging.getLogger(__name__) class RainGraph: + """Create and get rain radar animated SVG""" + def __init__(self, animation_data: RadarAnimationData, country: str, @@ -77,74 +79,110 @@ class RainGraph: async def build(self) -> Self: """Build the rain graph by calling all the method in the right order. Returns self when done""" - await self.draw_svg_frame() - self.draw_hour_bars() - self.draw_chances_path() - self.draw_data_line() - self.write_hint() - await self.insert_background() + await self._draw_svg_frame() + self._draw_hour_bars() + self._draw_chances_path() + self._draw_data_line() + self._write_hint() + await self._insert_background() self._dwg_save = copy.deepcopy(self._dwg) return self async def get_animated(self) -> bytes: - """Get the animated SVG. If called for the first time since refresh, downloads the images to build the file.""" + """ + Get the animated SVG. If called for the first time, downloads the cloud images to build the file. + + :return: utf-8 encoded animated SVG string + :raises: ValueError if build() was not called before + """ + + if self._dwg_save is None: + raise ValueError("You need to call .build() before getting the SVG") _LOGGER.info(f"Get animated with _dwg_animated {self._dwg_animated}") if self._dwg_animated is None: - clouds = self.download_clouds() + clouds = self._download_clouds() self._dwg = copy.deepcopy(self._dwg_save) - self.draw_current_fame_line() - self.draw_description_text() + self._draw_current_fame_line() + self._draw_description_text() try: await clouds - self.insert_cloud_layer() + self._insert_cloud_layer() except IrmKmiApiError as err: _LOGGER.warning(f"Could not download clouds from API: {err}") - await self.draw_location() + await self._draw_location() self._dwg_animated = self._dwg - return self.get_svg_string(still_image=False) + return self._get_svg_string(still_image=False) async def get_still(self) -> bytes: - """Get the animated SVG. If called for the first time since refresh, downloads the images to build the file.""" + """ + Get the still SVG. If called for the first time, downloads the cloud images to build the file. + + :return: utf-8 encoded SVG string + :raises: ValueError if build() was not called before + """ + + if self._dwg_save is None: + raise ValueError("You need to call .build() before getting the SVG") + _LOGGER.info(f"Get still with _dwg_still {self._dwg_still}") if self._dwg_still is None: idx = self._animation_data['most_recent_image_idx'] - cloud = self.download_clouds(idx) + cloud = self._download_clouds(idx) self._dwg = copy.deepcopy(self._dwg_save) - self.draw_current_fame_line(idx) - self.draw_description_text(idx) + self._draw_current_fame_line(idx) + self._draw_description_text(idx) try: await cloud - self.insert_cloud_layer(idx) + self._insert_cloud_layer(idx) except IrmKmiApiError as err: _LOGGER.warning(f"Could not download clouds from API: {err}") - await self.draw_location() + await self._draw_location() self._dwg_still = self._dwg - return self.get_svg_string(still_image=True) + return self._get_svg_string(still_image=True) - async def download_clouds(self, idx = None): + def get_hint(self) -> str: + """ + Get hint to display on the rain graph + :return: hint sentence as str + """ + return self._animation_data.get('hint', '') + + async def _download_clouds(self, idx=None): + """ + Download cloud images and save the result in the internal state. + + :param idx: index of the image to download (if not specified, downloads all the images) + :raises: IrmKmiApiError if communication with the API fails + """ imgs = [e['image'] for e in self._animation_data['sequence']] if idx is not None and type(imgs[idx]) is str: _LOGGER.info("Download single cloud image") print("Download single cloud image") - result = await self.download_images_from_api([imgs[idx]]) + result = await self._download_images_from_api([imgs[idx]]) self._animation_data['sequence'][idx]['image'] = result[0] else: _LOGGER.info("Download many cloud images") - result = await self.download_images_from_api([img for img in imgs if type(img) is str]) + result = await self._download_images_from_api([img for img in imgs if type(img) is str]) for i in range(len(self._animation_data['sequence'])): if type(self._animation_data['sequence'][i]['image']) is str: self._animation_data['sequence'][i]['image'] = result[0] result = result[1:] - async def download_images_from_api(self, urls: list[str]) -> list[Any]: - """Download a batch of images to create the radar frames.""" + async def _download_images_from_api(self, urls: list[str]) -> list[bytes]: + """ + Download a batch of images to create the radar frames. + + :param urls: list of urls to download + :return: list images downloaded as bytes + :raises: IrmKmiApiError if communication with the API fails + """ coroutines = list() for url in urls: @@ -155,10 +193,7 @@ class RainGraph: _LOGGER.info(f"Just downloaded {len(images_from_api)} images") return images_from_api - def get_hint(self) -> str: - return self._animation_data.get('hint', None) - - async def draw_svg_frame(self): + async def _draw_svg_frame(self): """Create the global area to draw the other items""" mimetype = "application/x-font-ttf" @@ -167,9 +202,7 @@ class RainGraph: self._dwg.embed_stylesheet(content) self._dwg.embed_stylesheet(""" - .roboto { - font-family: "Roboto Medium"; - } + .roboto { font-family: "Roboto Medium"; } """) fill_color = '#393C40' if self._dark_mode else '#385E95' @@ -178,7 +211,7 @@ class RainGraph: rx=None, ry=None, fill=fill_color, stroke='none')) - def draw_description_text(self, idx: int | None = None): + def _draw_description_text(self, idx: int | None = None): """For every frame write the amount of precipitation and the time at the top of the graph. If idx is set, only do it for the given idx""" @@ -192,7 +225,7 @@ class RainGraph: paragraph = self._dwg.add(self._dwg.g(class_="roboto", )) - self.write_time_and_rain(paragraph, rain_level, time) + self._write_time_and_rain(paragraph, rain_level, time) return for i in range(self._frame_count): @@ -212,9 +245,9 @@ class RainGraph: repeatCount="indefinite" )) - self.write_time_and_rain(paragraph, rain_level, time) + self._write_time_and_rain(paragraph, rain_level, time) - def write_time_and_rain(self, paragraph, rain_level, time): + def _write_time_and_rain(self, paragraph, rain_level, time): """Using the paragraph object, write the time and rain level data""" paragraph.add(self._dwg.text(f"{time}", insert=(self._offset, self._top_text_y_pos), text_anchor="start", @@ -227,11 +260,11 @@ class RainGraph: fill="white", stroke='none')) - def write_hint(self): + def _write_hint(self): """Add the hint text at the bottom of the graph""" paragraph = self._dwg.add(self._dwg.g(class_="roboto", )) - hint = self._animation_data['hint'] + hint = self.get_hint() paragraph.add(self._dwg.text(f"{hint}", insert=(self._svg_width / 2, self._bottom_text_y_pos), text_anchor="middle", @@ -239,7 +272,7 @@ class RainGraph: fill="white", stroke='none')) - def draw_chances_path(self): + def _draw_chances_path(self): """Draw the prevision margin area around the main forecast line""" list_lower_points = [] list_higher_points = [] @@ -264,36 +297,9 @@ class RainGraph: graph_rect_right -= self._interval_width if list_higher_points and list_lower_points: - self.draw_chance_precip(list_higher_points, list_lower_points) + self._draw_chance_precip(list_higher_points, list_lower_points) - def draw_chance_precip(self, list_higher_points: List, list_lower_points: List): - """Draw the blue solid line representing the actual rain forecast""" - precip_higher_chance_path = self._dwg.path(fill='#63c8fa', stroke='none', opacity=.3) - - list_higher_points[-1] = tuple(list(list_higher_points[-1]) + ['last']) - - self.set_curved_path(precip_higher_chance_path, list_higher_points + list_lower_points) - self._dwg.add(precip_higher_chance_path) - - @staticmethod - def set_curved_path(path, points): - """Pushes points on the path by creating a nice curve between them""" - if len(points) < 2: - return - - path.push('M', *points[0]) - - for i in range(1, len(points)): - x_mid = (points[i - 1][0] + points[i][0]) / 2 - y_mid = (points[i - 1][1] + points[i][1]) / 2 - - path.push('Q', points[i - 1][0], points[i - 1][1], x_mid, y_mid) - if points[i][-1] == 'last' or points[i - 1][-1] == 'last': - path.push('Q', points[i][0], points[i][1], points[i][0], points[i][1]) - - path.push('Q', points[-1][0], points[-1][1], points[-1][0], points[-1][1]) - - def draw_data_line(self): + def _draw_data_line(self): """Draw the main data line for the rain forecast""" rain_list: List[AnimationFrameData] = self._animation_data['sequence'] graph_rect_left = self._offset @@ -308,10 +314,10 @@ class RainGraph: graph_rect_top + (1.0 - position) * self._graph_height)) graph_rect_left += self._interval_width data_line_path = self._dwg.path(fill='none', stroke='#63c8fa', stroke_width=2) - self.set_curved_path(data_line_path, entry_list) + self._set_curved_path(data_line_path, entry_list) self._dwg.add(data_line_path) - def draw_hour_bars(self): + def _draw_hour_bars(self): """Draw the small bars at the bottom to represent the time""" hour_bar_height = 8 horizontal_inset = self._offset @@ -350,7 +356,7 @@ class RainGraph: end=(self._graph_width + self._interval_width / 2, self._graph_bottom), stroke='white')) - def draw_current_fame_line(self, idx: int | None = None): + def _draw_current_fame_line(self, idx: int | None = None): """Draw a solid white line on the timeline at the position of the given frame index""" x_position = self._offset if idx is None else self._offset + idx * self._interval_width now = self._dwg.add(self._dwg.line(start=(x_position, self._top_text_space), @@ -368,15 +374,21 @@ class RainGraph: dur=f"{self._frame_count * 0.3}s", repeatCount="indefinite")) - def get_svg_string(self, still_image: bool = False) -> bytes: + def _get_svg_string(self, still_image: bool = False) -> bytes: + """ + Get the utf-8 encoded string representing the SVG + + :param still_image: if true the non-animated version is returned + :return: utf-8 encoded string + """ return self._dwg_still.tostring().encode() if still_image else self._dwg_animated.tostring().encode() - async def insert_background(self): - png_data = self.get_background_png_b64() + async def _insert_background(self): + png_data = self._get_background_png_b64() image = self._dwg.image("data:image/png;base64," + png_data, insert=(0, 0), size=self._background_size) self._dwg.add(image) - def insert_cloud_layer(self, idx: int | None = None): + def _insert_cloud_layer(self, idx: int | None = None): imgs = [e['image'] for e in self._animation_data['sequence']] if idx is not None: @@ -402,23 +414,22 @@ class RainGraph: repeatCount="indefinite" )) - async def draw_location(self): + async def _draw_location(self): img = self._animation_data['location'] _LOGGER.info(f"Draw location layer with img of type {type(img)}") if type(img) is str: - result = await self.download_images_from_api([img]) + result = await self._download_images_from_api([img]) img = result[0] self._animation_data['location'] = img png_data = base64.b64encode(img).decode('utf-8') image = self._dwg.image("data:image/png;base64," + png_data, insert=(0, 0), size=self._background_size) self._dwg.add(image) - def get_dwg(self): + def _get_dwg(self) -> Drawing: return copy.deepcopy(self._dwg) - def get_background_png_b64(self): - _LOGGER.debug(f"Get b64 for {self._country} {self._style} {'dark' if self._dark_mode else 'light'} mode") + def _get_background_png_b64(self) -> str: if self._country == 'NL': return nl.nl_b64 elif self._style == OPTION_STYLE_SATELLITE: @@ -427,3 +438,30 @@ class RainGraph: return be_black.be_black_b64 else: return be_white.be_white_b64 + + def _draw_chance_precip(self, list_higher_points: List, list_lower_points: List): + """Draw the blue solid line representing the actual rain forecast""" + precip_higher_chance_path = self._dwg.path(fill='#63c8fa', stroke='none', opacity=.3) + + list_higher_points[-1] = tuple(list(list_higher_points[-1]) + ['last']) + + self._set_curved_path(precip_higher_chance_path, list_higher_points + list_lower_points) + self._dwg.add(precip_higher_chance_path) + + @staticmethod + def _set_curved_path(path, points): + """Pushes points on the path by creating a nice curve between them""" + if len(points) < 2: + return + + path.push('M', *points[0]) + + for i in range(1, len(points)): + x_mid = (points[i - 1][0] + points[i][0]) / 2 + y_mid = (points[i - 1][1] + points[i][1]) / 2 + + path.push('Q', points[i - 1][0], points[i - 1][1], x_mid, y_mid) + if points[i][-1] == 'last' or points[i - 1][-1] == 'last': + path.push('Q', points[i][0], points[i][1], points[i][0], points[i][1]) + + path.push('Q', points[-1][0], points[-1][1], points[-1][0], points[-1][1]) diff --git a/irm_kmi_api/utils.py b/irm_kmi_api/utils.py deleted file mode 100644 index 0155fec..0000000 --- a/irm_kmi_api/utils.py +++ /dev/null @@ -1,8 +0,0 @@ -from datetime import timedelta - - -def next_weekday(current, weekday): - days_ahead = weekday - current.weekday() - if days_ahead < 0: - days_ahead += 7 - return current + timedelta(days_ahead) diff --git a/tests/test_rain_graph.py b/tests/test_rain_graph.py index feadef0..81d42f5 100644 --- a/tests/test_rain_graph.py +++ b/tests/test_rain_graph.py @@ -48,9 +48,9 @@ async def test_svg_frame_setup(): style='STD', ) - await rain_graph.draw_svg_frame() + await rain_graph._draw_svg_frame() - svg_str = rain_graph.get_dwg().tostring() + svg_str = rain_graph._get_dwg().tostring() with open("irm_kmi_api/resources/roboto_medium.ttf", "rb") as file: font_b64 = base64.b64encode(file.read()).decode('utf-8') @@ -68,9 +68,9 @@ def test_svg_hint(): style='STD', ) - rain_graph.write_hint() + rain_graph._write_hint() - svg_str = rain_graph.get_dwg().tostring() + svg_str = rain_graph._get_dwg().tostring() assert "Testing SVG camera" in svg_str @@ -84,9 +84,9 @@ def test_svg_time_bars(): style='STD', ) - rain_graph.draw_hour_bars() + rain_graph._draw_hour_bars() - svg_str = rain_graph.get_dwg().tostring() + svg_str = rain_graph._get_dwg().tostring() assert "19h" in svg_str assert "20h" in svg_str @@ -103,9 +103,9 @@ def test_draw_chances_path(): style='STD', ) - rain_graph.draw_chances_path() + rain_graph._draw_chances_path() - svg_str = rain_graph.get_dwg().tostring() + svg_str = rain_graph._get_dwg().tostring() assert 'fill="#63c8fa"' in svg_str assert 'opacity="0.3"' in svg_str @@ -121,9 +121,9 @@ def test_draw_data_line(): style='STD', ) - rain_graph.draw_data_line() + rain_graph._draw_data_line() - svg_str = rain_graph.get_dwg().tostring() + svg_str = rain_graph._get_dwg().tostring() assert 'fill="none"' in svg_str assert 'stroke-width="2"' in svg_str @@ -139,12 +139,12 @@ async def test_insert_background(): style='STD', ) - await rain_graph.insert_background() + await rain_graph._insert_background() with open("irm_kmi_api/resources/be_white.png", "rb") as file: png_b64 = base64.b64encode(file.read()).decode('utf-8') - svg_str = rain_graph.get_dwg().tostring() + svg_str = rain_graph._get_dwg().tostring() assert png_b64 in svg_str assert "